aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-28 00:19:10 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-28 00:19:10 +0200
commit8afb4d30750840f66d9f97c2c54a893d3934c45e (patch)
treeca3edd785513bcb5b9bcb3fb3847db34bfdebc71 /jsonrpc
parentd1c816660c0583db33d160e2ef3e980bef0d5a85 (diff)
jsonrpc: enable concurrent requests in `Client`
Diffstat (limited to 'jsonrpc')
-rw-r--r--jsonrpc/examples/client.rs18
-rw-r--r--jsonrpc/src/client/builder.rs180
-rw-r--r--jsonrpc/src/client/mod.rs324
-rw-r--r--jsonrpc/src/lib.rs5
-rw-r--r--jsonrpc/src/server/builder.rs173
-rw-r--r--jsonrpc/src/server/mod.rs176
6 files changed, 453 insertions, 423 deletions
diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs
index 3289772..662aacb 100644
--- a/jsonrpc/examples/client.rs
+++ b/jsonrpc/examples/client.rs
@@ -1,4 +1,7 @@
+use std::time::Duration;
+
use serde::{Deserialize, Serialize};
+use smol::Timer;
use karyon_jsonrpc::Client;
@@ -20,6 +23,16 @@ fn main() {
.await
.unwrap();
+ let clientc = client.clone();
+ smol::spawn(async move {
+ loop {
+ Timer::after(Duration::from_millis(500)).await;
+ let result: Pong = clientc.call("Calc.ping", ()).await.unwrap();
+ println!("ping msg result: {:?}", result);
+ }
+ })
+ .detach();
+
let params = Req { x: 10, y: 7 };
let result: u32 = client.call("Calc.add", params).await.unwrap();
println!("result {result}");
@@ -28,10 +41,9 @@ fn main() {
let result: u32 = client.call("Calc.sub", params).await.unwrap();
println!("result {result}");
- let result: Pong = client.call("Calc.ping", ()).await.unwrap();
- println!("result {:?}", result);
-
let result: String = client.call("Calc.version", ()).await.unwrap();
println!("result {result}");
+
+ Timer::after(Duration::from_secs(10)).await;
});
}
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs
new file mode 100644
index 0000000..4bfb5c3
--- /dev/null
+++ b/jsonrpc/src/client/builder.rs
@@ -0,0 +1,180 @@
+use std::{collections::HashMap, sync::Arc};
+
+#[cfg(feature = "smol")]
+use futures_rustls::rustls;
+#[cfg(feature = "tokio")]
+use tokio_rustls::rustls;
+
+use karyon_core::{async_runtime::lock::Mutex, async_util::TaskGroup};
+use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
+
+#[cfg(feature = "ws")]
+use karyon_net::ws::{ClientWsConfig, ClientWssConfig};
+
+#[cfg(feature = "ws")]
+use crate::codec::WsJsonCodec;
+
+use crate::{codec::JsonCodec, Error, Result, TcpConfig};
+
+use super::Client;
+
+const DEFAULT_TIMEOUT: u64 = 3000; // 3s
+
+impl Client {
+ /// Creates a new [`ClientBuilder`]
+ ///
+ /// This function initializes a `ClientBuilder` with the specified endpoint.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?;
+ /// ```
+ pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
+ let endpoint = endpoint.to_endpoint()?;
+ Ok(ClientBuilder {
+ endpoint,
+ timeout: Some(DEFAULT_TIMEOUT),
+ tls_config: None,
+ tcp_config: Default::default(),
+ })
+ }
+}
+
+/// Builder for constructing an RPC [`Client`].
+pub struct ClientBuilder {
+ endpoint: Endpoint,
+ tls_config: Option<(rustls::ClientConfig, String)>,
+ tcp_config: TcpConfig,
+ timeout: Option<u64>,
+}
+
+impl ClientBuilder {
+ /// Set timeout for sending and receiving messages, in milliseconds.
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// let client = Client::builder()?.set_timeout(5000).build()?;
+ /// ```
+ pub fn set_timeout(mut self, timeout: u64) -> Self {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ /// Configure TCP settings for the client.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let tcp_config = TcpConfig::default();
+ /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?;
+ /// ```
+ ///
+ /// This function will return an error if the endpoint does not support TCP protocols.
+ pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tcp_config = config;
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ /// Configure TLS settings for the client.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let tls_config = rustls::ClientConfig::new(...);
+ /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?;
+ /// ```
+ ///
+ /// This function will return an error if the endpoint does not support TLS protocols.
+ pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tls_config = Some((config, dns_name.to_string()));
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ /// Build RPC client from [`ClientBuilder`].
+ ///
+ /// This function creates a new RPC client using the configurations
+ /// specified in the `ClientBuilder`. It returns a `Arc<Client>` on success.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let client = Client::builder(endpoint)?
+ /// .set_timeout(5000)
+ /// .tcp_config(tcp_config)?
+ /// .tls_config(tls_config, "example.com")?
+ /// .build()
+ /// .await?;
+ /// ```
+ pub async fn build(self) -> Result<Arc<Client>> {
+ let conn: Conn<serde_json::Value> = match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::tls::dial(
+ &self.endpoint,
+ ClientTlsConfig {
+ dns_name,
+ client_config: conf,
+ tcp_config: self.tcp_config,
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?,
+ ),
+ },
+ #[cfg(feature = "ws")]
+ Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::ws::dial(
+ &self.endpoint,
+ ClientWsConfig {
+ tcp_config: self.tcp_config,
+ wss_config: Some(ClientWssConfig {
+ dns_name,
+ client_config: conf,
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => {
+ let config = ClientWsConfig {
+ tcp_config: self.tcp_config,
+ wss_config: None,
+ };
+ Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?)
+ }
+ },
+ #[cfg(all(feature = "unix", target_family = "unix"))]
+ Endpoint::Unix(..) => Box::new(
+ karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
+ ),
+ _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ };
+
+ let client = Arc::new(Client {
+ timeout: self.timeout,
+ conn,
+ chans: Mutex::new(HashMap::new()),
+ subscriptions: Mutex::new(HashMap::new()),
+ task_group: TaskGroup::new(),
+ });
+ client.start_background_receiving();
+ Ok(client)
+ }
+}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index b614c95..0666ee0 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -1,33 +1,22 @@
+pub mod builder;
+
use std::{collections::HashMap, sync::Arc, time::Duration};
use log::{debug, error, warn};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::json;
-#[cfg(feature = "smol")]
-use futures_rustls::rustls;
-#[cfg(feature = "tokio")]
-use tokio_rustls::rustls;
-
use karyon_core::{
async_runtime::lock::Mutex,
async_util::{timeout, TaskGroup, TaskResult},
- util::random_64,
+ util::random_32,
};
-use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
-
-#[cfg(feature = "ws")]
-use karyon_net::ws::{ClientWsConfig, ClientWssConfig};
+use karyon_net::Conn;
-#[cfg(feature = "ws")]
-use crate::codec::WsJsonCodec;
-
-use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig};
+use crate::{message, Error, Result, SubscriptionID};
const CHANNEL_CAP: usize = 10;
-const DEFAULT_TIMEOUT: u64 = 3000; // 3s
-
/// Type alias for a subscription to receive notifications.
///
/// The receiver channel is returned by the `subscribe` method to receive
@@ -37,9 +26,8 @@ pub type Subscription = async_channel::Receiver<serde_json::Value>;
/// Represents an RPC client
pub struct Client {
conn: Conn<serde_json::Value>,
- chan_tx: async_channel::Sender<message::Response>,
- chan_rx: async_channel::Receiver<message::Response>,
timeout: Option<u64>,
+ chans: Mutex<HashMap<u32, async_channel::Sender<message::Response>>>,
subscriptions: Mutex<HashMap<SubscriptionID, async_channel::Sender<serde_json::Value>>>,
task_group: TaskGroup,
}
@@ -51,20 +39,7 @@ impl Client {
method: &str,
params: T,
) -> Result<V> {
- let request = self.send_request(method, params).await?;
-
- let response = match self.timeout {
- Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
- None => self.chan_rx.recv().await?,
- };
-
- if let Some(error) = response.error {
- return Err(Error::CallError(error.code, error.message));
- }
-
- if response.id.is_none() || response.id.unwrap() != request.id {
- return Err(Error::InvalidMsg("Invalid response id"));
- }
+ let response = self.send_request(method, params).await?;
match response.result {
Some(result) => Ok(serde_json::from_value::<V>(result)?),
@@ -82,20 +57,7 @@ impl Client {
method: &str,
params: T,
) -> Result<(SubscriptionID, Subscription)> {
- let request = self.send_request(method, params).await?;
-
- let response = match self.timeout {
- Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
- None => self.chan_rx.recv().await?,
- };
-
- if let Some(error) = response.error {
- return Err(Error::SubscribeError(error.code, error.message));
- }
-
- if response.id.is_none() || response.id.unwrap() != request.id {
- return Err(Error::InvalidMsg("Invalid response id"));
- }
+ let response = self.send_request(method, params).await?;
let sub_id = match response.result {
Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
@@ -113,21 +75,7 @@ impl Client {
/// This function sends an unsubscription request for the specified method
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
- let request = self.send_request(method, sub_id).await?;
-
- let response = match self.timeout {
- Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
- None => self.chan_rx.recv().await?,
- };
-
- if let Some(error) = response.error {
- return Err(Error::SubscribeError(error.code, error.message));
- }
-
- if response.id.is_none() || response.id.unwrap() != request.id {
- return Err(Error::InvalidMsg("Invalid response id"));
- }
-
+ let _ = self.send_request(method, sub_id).await?;
self.subscriptions.lock().await.remove(&sub_id);
Ok(())
}
@@ -136,8 +84,8 @@ impl Client {
&self,
method: &str,
params: T,
- ) -> Result<message::Request> {
- let id = random_64();
+ ) -> Result<message::Response> {
+ let id = random_32();
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
id: json!(id),
@@ -157,8 +105,39 @@ impl Client {
}
}
+ let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
+ self.chans.lock().await.insert(id, tx);
+
+ let response = match self.wait_for_response(rx).await {
+ Ok(r) => r,
+ Err(err) => {
+ self.chans.lock().await.remove(&id);
+ return Err(err);
+ }
+ };
+
+ if let Some(error) = response.error {
+ return Err(Error::SubscribeError(error.code, error.message));
+ }
+
+ if *response.id.as_ref().unwrap() != request.id {
+ return Err(Error::InvalidMsg("Invalid response id"));
+ }
+
debug!("--> {request}");
- Ok(request)
+ Ok(response)
+ }
+
+ async fn wait_for_response(
+ &self,
+ rx: async_channel::Receiver<message::Response>,
+ ) -> Result<message::Response> {
+ match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), rx.recv())
+ .await?
+ .map_err(Error::from),
+ None => rx.recv().await.map_err(Error::from),
+ }
}
fn start_background_receiving(self: &Arc<Self>) {
@@ -175,201 +154,54 @@ impl Client {
async move {
loop {
let msg = selfc.conn.recv().await?;
- if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
- debug!("<-- {res}");
- selfc.chan_tx.send(res).await?;
- continue;
- }
-
- if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
- debug!("<-- {nt}");
- let sub_result: message::NotificationResult = match nt.params {
- Some(ref p) => serde_json::from_value(p.clone())?,
- None => return Err(Error::InvalidMsg("Invalid notification msg")),
- };
-
- match selfc
- .subscriptions
- .lock()
- .await
- .get(&sub_result.subscription)
- {
- Some(s) => {
- s.send(sub_result.result.unwrap_or(json!(""))).await?;
- continue;
- }
- None => {
- warn!("Receive unknown notification {}", sub_result.subscription);
- continue;
- }
- }
- }
-
- error!("Receive unexpected msg: {msg}");
- return Err(Error::InvalidMsg("Unexpected msg"));
+ selfc.handle_msg(msg).await?;
}
},
on_failure,
);
}
-}
-
-/// Builder for constructing an RPC [`Client`].
-pub struct ClientBuilder {
- endpoint: Endpoint,
- tls_config: Option<(rustls::ClientConfig, String)>,
- tcp_config: TcpConfig,
- timeout: Option<u64>,
-}
-
-impl ClientBuilder {
- /// Set timeout for sending and receiving messages, in milliseconds.
- ///
- /// # Examples
- ///
- /// ```ignore
- /// let client = Client::builder()?.set_timeout(5000).build()?;
- /// ```
- pub fn set_timeout(mut self, timeout: u64) -> Self {
- self.timeout = Some(timeout);
- self
- }
- /// Configure TCP settings for the client.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let tcp_config = TcpConfig::default();
- /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?;
- /// ```
- ///
- /// This function will return an error if the endpoint does not support TCP protocols.
- pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
- match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
- self.tcp_config = config;
- Ok(self)
+ async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> {
+ if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
+ debug!("<-- {res}");
+ if res.id.is_none() {
+ return Err(Error::InvalidMsg("Response id is none"));
}
- _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- }
- }
- /// Configure TLS settings for the client.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let tls_config = rustls::ClientConfig::new(...);
- /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?;
- /// ```
- ///
- /// This function will return an error if the endpoint does not support TLS protocols.
- pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
- match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
- self.tls_config = Some((config, dns_name.to_string()));
- Ok(self)
+ let id: u32 = serde_json::from_value(res.id.clone().unwrap())?;
+ match self.chans.lock().await.remove(&id) {
+ Some(tx) => tx.send(res).await?,
+ None => return Err(Error::InvalidMsg("Receive unkown message")),
}
- _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+
+ return Ok(());
}
- }
- /// Build RPC client from [`ClientBuilder`].
- ///
- /// This function creates a new RPC client using the configurations
- /// specified in the `ClientBuilder`. It returns a `Arc<Client>` on success.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let client = Client::builder(endpoint)?
- /// .set_timeout(5000)
- /// .tcp_config(tcp_config)?
- /// .tls_config(tls_config, "example.com")?
- /// .build()
- /// .await?;
- /// ```
- pub async fn build(self) -> Result<Arc<Client>> {
- let conn: Conn<serde_json::Value> = match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config {
- Some((conf, dns_name)) => Box::new(
- karyon_net::tls::dial(
- &self.endpoint,
- ClientTlsConfig {
- dns_name,
- client_config: conf,
- tcp_config: self.tcp_config,
- },
- JsonCodec {},
- )
- .await?,
- ),
- None => Box::new(
- karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?,
- ),
- },
- #[cfg(feature = "ws")]
- Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config {
- Some((conf, dns_name)) => Box::new(
- karyon_net::ws::dial(
- &self.endpoint,
- ClientWsConfig {
- tcp_config: self.tcp_config,
- wss_config: Some(ClientWssConfig {
- dns_name,
- client_config: conf,
- }),
- },
- WsJsonCodec {},
- )
- .await?,
- ),
+ if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
+ debug!("<-- {nt}");
+ let sub_result: message::NotificationResult = match nt.params {
+ Some(ref p) => serde_json::from_value(p.clone())?,
+ None => return Err(Error::InvalidMsg("Invalid notification msg")),
+ };
+
+ match self
+ .subscriptions
+ .lock()
+ .await
+ .get(&sub_result.subscription)
+ {
+ Some(s) => {
+ s.send(sub_result.result.unwrap_or(json!(""))).await?;
+ return Ok(());
+ }
None => {
- let config = ClientWsConfig {
- tcp_config: self.tcp_config,
- wss_config: None,
- };
- Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?)
+ warn!("Receive unknown notification {}", sub_result.subscription);
+ return Ok(());
}
- },
- #[cfg(all(feature = "unix", target_family = "unix"))]
- Endpoint::Unix(..) => Box::new(
- karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
- ),
- _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- };
+ }
+ }
- let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
- let client = Arc::new(Client {
- timeout: self.timeout,
- conn,
- chan_tx: tx,
- chan_rx: rx,
- subscriptions: Mutex::new(HashMap::new()),
- task_group: TaskGroup::new(),
- });
- client.start_background_receiving();
- Ok(client)
- }
-}
-impl Client {
- /// Creates a new [`ClientBuilder`]
- ///
- /// This function initializes a `ClientBuilder` with the specified endpoint.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?;
- /// ```
- pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
- let endpoint = endpoint.to_endpoint()?;
- Ok(ClientBuilder {
- endpoint,
- timeout: Some(DEFAULT_TIMEOUT),
- tls_config: None,
- tcp_config: Default::default(),
- })
+ error!("Receive unexpected msg: {msg}");
+ Err(Error::InvalidMsg("Unexpected msg"))
}
}
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index 5577455..14840fa 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -6,13 +6,14 @@ mod error;
pub mod message;
mod server;
-pub use client::{Client, ClientBuilder};
+pub use client::{builder::ClientBuilder, Client};
pub use error::{Error, Result};
pub use server::{
+ builder::ServerBuilder,
channel::{ArcChannel, Channel, Subscription, SubscriptionID},
pubsub_service::{PubSubRPCMethod, PubSubRPCService},
service::{RPCMethod, RPCService},
- Server, ServerBuilder,
+ Server,
};
pub use karyon_jsonrpc_macro::{rpc_impl, rpc_pubsub_impl};
diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs
new file mode 100644
index 0000000..90024f3
--- /dev/null
+++ b/jsonrpc/src/server/builder.rs
@@ -0,0 +1,173 @@
+use std::{collections::HashMap, sync::Arc};
+
+#[cfg(feature = "smol")]
+use futures_rustls::rustls;
+#[cfg(feature = "tokio")]
+use tokio_rustls::rustls;
+
+use karyon_core::{async_runtime::Executor, async_util::TaskGroup};
+use karyon_net::{Endpoint, Listener, ToEndpoint};
+
+#[cfg(feature = "ws")]
+use crate::codec::WsJsonCodec;
+
+#[cfg(feature = "ws")]
+use karyon_net::ws::ServerWsConfig;
+
+use crate::{codec::JsonCodec, Error, PubSubRPCService, RPCService, Result, TcpConfig};
+
+use super::Server;
+
+/// Builder for constructing an RPC [`Server`].
+pub struct ServerBuilder {
+ endpoint: Endpoint,
+ tcp_config: TcpConfig,
+ tls_config: Option<rustls::ServerConfig>,
+ services: HashMap<String, Arc<dyn RPCService + 'static>>,
+ pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>,
+}
+
+impl ServerBuilder {
+ /// Adds a new RPC service to the server.
+ pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
+ self.services.insert(service.name(), service);
+ self
+ }
+
+ /// Adds a new PubSub RPC service to the server.
+ pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
+ self.pubsub_services.insert(service.name(), service);
+ self
+ }
+
+ /// Configure TCP settings for the server.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let tcp_config = TcpConfig::default();
+ /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?;
+ /// ```
+ ///
+ /// This function will return an error if the endpoint does not support TCP protocols.
+ pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tcp_config = config;
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ /// Configure TLS settings for the server.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let tls_config = rustls::ServerConfig::new(...);
+ /// let server = Server::builder()?.tls_config(tls_config)?.build()?;
+ /// ```
+ ///
+ /// This function will return an error if the endpoint does not support TLS protocols.
+ pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tls_config = Some(config);
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ /// Builds the server with the configured options.
+ pub async fn build(self) -> Result<Arc<Server>> {
+ self._build(TaskGroup::new()).await
+ }
+
+ /// Builds the server with the configured options and an executor.
+ pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> {
+ self._build(TaskGroup::with_executor(ex)).await
+ }
+
+ async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> {
+ let listener: Listener<serde_json::Value> = match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::tls::listen(
+ &self.endpoint,
+ karyon_net::tls::ServerTlsConfig {
+ server_config: conf.clone(),
+ tcp_config: self.tcp_config,
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?,
+ ),
+ },
+ #[cfg(feature = "ws")]
+ Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::ws::listen(
+ &self.endpoint,
+ ServerWsConfig {
+ tcp_config: self.tcp_config,
+ wss_config: Some(karyon_net::ws::ServerWssConfig {
+ server_config: conf.clone(),
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => {
+ let config = ServerWsConfig {
+ tcp_config: self.tcp_config,
+ wss_config: None,
+ };
+ Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?)
+ }
+ },
+ #[cfg(all(feature = "unix", target_family = "unix"))]
+ Endpoint::Unix(..) => Box::new(karyon_net::unix::listen(
+ &self.endpoint,
+ Default::default(),
+ JsonCodec {},
+ )?),
+
+ _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ };
+
+ Ok(Arc::new(Server {
+ listener,
+ task_group,
+ services: self.services,
+ pubsub_services: self.pubsub_services,
+ }))
+ }
+}
+
+impl Server {
+ /// Creates a new [`ServerBuilder`]
+ ///
+ /// This function initializes a `ServerBuilder` with the specified endpoint.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?;
+ /// ```
+ pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> {
+ let endpoint = endpoint.to_endpoint()?;
+ Ok(ServerBuilder {
+ endpoint,
+ services: HashMap::new(),
+ pubsub_services: HashMap::new(),
+ tcp_config: Default::default(),
+ tls_config: None,
+ })
+ }
+}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index e1805e1..7f28de2 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -1,3 +1,4 @@
+pub mod builder;
pub mod channel;
pub mod pubsub_service;
pub mod service;
@@ -6,25 +7,10 @@ use std::{collections::HashMap, sync::Arc};
use log::{debug, error, trace, warn};
-#[cfg(feature = "smol")]
-use futures_rustls::rustls;
-#[cfg(feature = "tokio")]
-use tokio_rustls::rustls;
+use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
+use karyon_net::{Conn, Endpoint, Listener};
-use karyon_core::{
- async_runtime::Executor,
- async_util::{select, Either, TaskGroup, TaskResult},
-};
-
-use karyon_net::{Conn, Endpoint, Listener, ToEndpoint};
-
-#[cfg(feature = "ws")]
-use crate::codec::WsJsonCodec;
-
-#[cfg(feature = "ws")]
-use karyon_net::ws::ServerWsConfig;
-
-use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result, TcpConfig};
+use crate::{message, Error, PubSubRPCService, RPCService, Result};
use channel::{ArcChannel, Channel};
@@ -302,157 +288,3 @@ impl Server {
}
}
}
-
-/// Builder for constructing an RPC [`Server`].
-pub struct ServerBuilder {
- endpoint: Endpoint,
- tcp_config: TcpConfig,
- tls_config: Option<rustls::ServerConfig>,
- services: HashMap<String, Arc<dyn RPCService + 'static>>,
- pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>,
-}
-
-impl ServerBuilder {
- /// Adds a new RPC service to the server.
- pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
- self.services.insert(service.name(), service);
- self
- }
-
- /// Adds a new PubSub RPC service to the server.
- pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
- self.pubsub_services.insert(service.name(), service);
- self
- }
-
- /// Configure TCP settings for the server.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let tcp_config = TcpConfig::default();
- /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?;
- /// ```
- ///
- /// This function will return an error if the endpoint does not support TCP protocols.
- pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder> {
- match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
- self.tcp_config = config;
- Ok(self)
- }
- _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- }
- }
-
- /// Configure TLS settings for the server.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let tls_config = rustls::ServerConfig::new(...);
- /// let server = Server::builder()?.tls_config(tls_config)?.build()?;
- /// ```
- ///
- /// This function will return an error if the endpoint does not support TLS protocols.
- pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> {
- match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
- self.tls_config = Some(config);
- Ok(self)
- }
- _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- }
- }
-
- /// Builds the server with the configured options.
- pub async fn build(self) -> Result<Arc<Server>> {
- self._build(TaskGroup::new()).await
- }
-
- /// Builds the server with the configured options and an executor.
- pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> {
- self._build(TaskGroup::with_executor(ex)).await
- }
-
- async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> {
- let listener: Listener<serde_json::Value> = match self.endpoint {
- Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config {
- Some(conf) => Box::new(
- karyon_net::tls::listen(
- &self.endpoint,
- karyon_net::tls::ServerTlsConfig {
- server_config: conf.clone(),
- tcp_config: self.tcp_config,
- },
- JsonCodec {},
- )
- .await?,
- ),
- None => Box::new(
- karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?,
- ),
- },
- #[cfg(feature = "ws")]
- Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config {
- Some(conf) => Box::new(
- karyon_net::ws::listen(
- &self.endpoint,
- ServerWsConfig {
- tcp_config: self.tcp_config,
- wss_config: Some(karyon_net::ws::ServerWssConfig {
- server_config: conf.clone(),
- }),
- },
- WsJsonCodec {},
- )
- .await?,
- ),
- None => {
- let config = ServerWsConfig {
- tcp_config: self.tcp_config,
- wss_config: None,
- };
- Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?)
- }
- },
- #[cfg(all(feature = "unix", target_family = "unix"))]
- Endpoint::Unix(..) => Box::new(karyon_net::unix::listen(
- &self.endpoint,
- Default::default(),
- JsonCodec {},
- )?),
-
- _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- };
-
- Ok(Arc::new(Server {
- listener,
- task_group,
- services: self.services,
- pubsub_services: self.pubsub_services,
- }))
- }
-}
-
-impl Server {
- /// Creates a new [`ServerBuilder`]
- ///
- /// This function initializes a `ServerBuilder` with the specified endpoint.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?;
- /// ```
- pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> {
- let endpoint = endpoint.to_endpoint()?;
- Ok(ServerBuilder {
- endpoint,
- services: HashMap::new(),
- pubsub_services: HashMap::new(),
- tcp_config: Default::default(),
- tls_config: None,
- })
- }
-}