From 8afb4d30750840f66d9f97c2c54a893d3934c45e Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 May 2024 00:19:10 +0200 Subject: jsonrpc: enable concurrent requests in `Client` --- jsonrpc/examples/client.rs | 18 ++- jsonrpc/src/client/builder.rs | 180 +++++++++++++++++++++++ jsonrpc/src/client/mod.rs | 324 ++++++++++-------------------------------- jsonrpc/src/lib.rs | 5 +- jsonrpc/src/server/builder.rs | 173 ++++++++++++++++++++++ jsonrpc/src/server/mod.rs | 176 +---------------------- 6 files changed, 453 insertions(+), 423 deletions(-) create mode 100644 jsonrpc/src/client/builder.rs create mode 100644 jsonrpc/src/server/builder.rs (limited to 'jsonrpc') diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs index 3289772..662aacb 100644 --- a/jsonrpc/examples/client.rs +++ b/jsonrpc/examples/client.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use serde::{Deserialize, Serialize}; +use smol::Timer; use karyon_jsonrpc::Client; @@ -20,6 +23,16 @@ fn main() { .await .unwrap(); + let clientc = client.clone(); + smol::spawn(async move { + loop { + Timer::after(Duration::from_millis(500)).await; + let result: Pong = clientc.call("Calc.ping", ()).await.unwrap(); + println!("ping msg result: {:?}", result); + } + }) + .detach(); + let params = Req { x: 10, y: 7 }; let result: u32 = client.call("Calc.add", params).await.unwrap(); println!("result {result}"); @@ -28,10 +41,9 @@ fn main() { let result: u32 = client.call("Calc.sub", params).await.unwrap(); println!("result {result}"); - let result: Pong = client.call("Calc.ping", ()).await.unwrap(); - println!("result {:?}", result); - let result: String = client.call("Calc.version", ()).await.unwrap(); println!("result {result}"); + + Timer::after(Duration::from_secs(10)).await; }); } diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs new file mode 100644 index 0000000..4bfb5c3 --- /dev/null +++ b/jsonrpc/src/client/builder.rs @@ -0,0 +1,180 @@ +use std::{collections::HashMap, sync::Arc}; + +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use karyon_core::{async_runtime::lock::Mutex, async_util::TaskGroup}; +use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; + +#[cfg(feature = "ws")] +use karyon_net::ws::{ClientWsConfig, ClientWssConfig}; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; + +use crate::{codec::JsonCodec, Error, Result, TcpConfig}; + +use super::Client; + +const DEFAULT_TIMEOUT: u64 = 3000; // 3s + +impl Client { + /// Creates a new [`ClientBuilder`] + /// + /// This function initializes a `ClientBuilder` with the specified endpoint. + /// + /// # Example + /// + /// ```ignore + /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ClientBuilder { + endpoint, + timeout: Some(DEFAULT_TIMEOUT), + tls_config: None, + tcp_config: Default::default(), + }) + } +} + +/// Builder for constructing an RPC [`Client`]. +pub struct ClientBuilder { + endpoint: Endpoint, + tls_config: Option<(rustls::ClientConfig, String)>, + tcp_config: TcpConfig, + timeout: Option, +} + +impl ClientBuilder { + /// Set timeout for sending and receiving messages, in milliseconds. + /// + /// # Examples + /// + /// ```ignore + /// let client = Client::builder()?.set_timeout(5000).build()?; + /// ``` + pub fn set_timeout(mut self, timeout: u64) -> Self { + self.timeout = Some(timeout); + self + } + + /// Configure TCP settings for the client. + /// + /// # Example + /// + /// ```ignore + /// let tcp_config = TcpConfig::default(); + /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TCP protocols. + pub fn tcp_config(mut self, config: TcpConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tcp_config = config; + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Configure TLS settings for the client. + /// + /// # Example + /// + /// ```ignore + /// let tls_config = rustls::ClientConfig::new(...); + /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TLS protocols. + pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some((config, dns_name.to_string())); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Build RPC client from [`ClientBuilder`]. + /// + /// This function creates a new RPC client using the configurations + /// specified in the `ClientBuilder`. It returns a `Arc` on success. + /// + /// # Example + /// + /// ```ignore + /// let client = Client::builder(endpoint)? + /// .set_timeout(5000) + /// .tcp_config(tcp_config)? + /// .tls_config(tls_config, "example.com")? + /// .build() + /// .await?; + /// ``` + pub async fn build(self) -> Result> { + let conn: Conn = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.endpoint, + ClientTlsConfig { + dns_name, + client_config: conf, + tcp_config: self.tcp_config, + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?, + ), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &self.endpoint, + ClientWsConfig { + tcp_config: self.tcp_config, + wss_config: Some(ClientWssConfig { + dns_name, + client_config: conf, + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => { + let config = ClientWsConfig { + tcp_config: self.tcp_config, + wss_config: None, + }; + Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?) + } + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new( + karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + let client = Arc::new(Client { + timeout: self.timeout, + conn, + chans: Mutex::new(HashMap::new()), + subscriptions: Mutex::new(HashMap::new()), + task_group: TaskGroup::new(), + }); + client.start_background_receiving(); + Ok(client) + } +} diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index b614c95..0666ee0 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -1,33 +1,22 @@ +pub mod builder; + use std::{collections::HashMap, sync::Arc, time::Duration}; use log::{debug, error, warn}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::json; -#[cfg(feature = "smol")] -use futures_rustls::rustls; -#[cfg(feature = "tokio")] -use tokio_rustls::rustls; - use karyon_core::{ async_runtime::lock::Mutex, async_util::{timeout, TaskGroup, TaskResult}, - util::random_64, + util::random_32, }; -use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; - -#[cfg(feature = "ws")] -use karyon_net::ws::{ClientWsConfig, ClientWssConfig}; +use karyon_net::Conn; -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - -use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig}; +use crate::{message, Error, Result, SubscriptionID}; const CHANNEL_CAP: usize = 10; -const DEFAULT_TIMEOUT: u64 = 3000; // 3s - /// Type alias for a subscription to receive notifications. /// /// The receiver channel is returned by the `subscribe` method to receive @@ -37,9 +26,8 @@ pub type Subscription = async_channel::Receiver; /// Represents an RPC client pub struct Client { conn: Conn, - chan_tx: async_channel::Sender, - chan_rx: async_channel::Receiver, timeout: Option, + chans: Mutex>>, subscriptions: Mutex>>, task_group: TaskGroup, } @@ -51,20 +39,7 @@ impl Client { method: &str, params: T, ) -> Result { - let request = self.send_request(method, params).await?; - - let response = match self.timeout { - Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, - None => self.chan_rx.recv().await?, - }; - - if let Some(error) = response.error { - return Err(Error::CallError(error.code, error.message)); - } - - if response.id.is_none() || response.id.unwrap() != request.id { - return Err(Error::InvalidMsg("Invalid response id")); - } + let response = self.send_request(method, params).await?; match response.result { Some(result) => Ok(serde_json::from_value::(result)?), @@ -82,20 +57,7 @@ impl Client { method: &str, params: T, ) -> Result<(SubscriptionID, Subscription)> { - let request = self.send_request(method, params).await?; - - let response = match self.timeout { - Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, - None => self.chan_rx.recv().await?, - }; - - if let Some(error) = response.error { - return Err(Error::SubscribeError(error.code, error.message)); - } - - if response.id.is_none() || response.id.unwrap() != request.id { - return Err(Error::InvalidMsg("Invalid response id")); - } + let response = self.send_request(method, params).await?; let sub_id = match response.result { Some(result) => serde_json::from_value::(result)?, @@ -113,21 +75,7 @@ impl Client { /// This function sends an unsubscription request for the specified method /// and subscription ID. It waits for the response to confirm the unsubscription. pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { - let request = self.send_request(method, sub_id).await?; - - let response = match self.timeout { - Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, - None => self.chan_rx.recv().await?, - }; - - if let Some(error) = response.error { - return Err(Error::SubscribeError(error.code, error.message)); - } - - if response.id.is_none() || response.id.unwrap() != request.id { - return Err(Error::InvalidMsg("Invalid response id")); - } - + let _ = self.send_request(method, sub_id).await?; self.subscriptions.lock().await.remove(&sub_id); Ok(()) } @@ -136,8 +84,8 @@ impl Client { &self, method: &str, params: T, - ) -> Result { - let id = random_64(); + ) -> Result { + let id = random_32(); let request = message::Request { jsonrpc: message::JSONRPC_VERSION.to_string(), id: json!(id), @@ -157,8 +105,39 @@ impl Client { } } + let (tx, rx) = async_channel::bounded(CHANNEL_CAP); + self.chans.lock().await.insert(id, tx); + + let response = match self.wait_for_response(rx).await { + Ok(r) => r, + Err(err) => { + self.chans.lock().await.remove(&id); + return Err(err); + } + }; + + if let Some(error) = response.error { + return Err(Error::SubscribeError(error.code, error.message)); + } + + if *response.id.as_ref().unwrap() != request.id { + return Err(Error::InvalidMsg("Invalid response id")); + } + debug!("--> {request}"); - Ok(request) + Ok(response) + } + + async fn wait_for_response( + &self, + rx: async_channel::Receiver, + ) -> Result { + match self.timeout { + Some(t) => timeout(Duration::from_millis(t), rx.recv()) + .await? + .map_err(Error::from), + None => rx.recv().await.map_err(Error::from), + } } fn start_background_receiving(self: &Arc) { @@ -175,201 +154,54 @@ impl Client { async move { loop { let msg = selfc.conn.recv().await?; - if let Ok(res) = serde_json::from_value::(msg.clone()) { - debug!("<-- {res}"); - selfc.chan_tx.send(res).await?; - continue; - } - - if let Ok(nt) = serde_json::from_value::(msg.clone()) { - debug!("<-- {nt}"); - let sub_result: message::NotificationResult = match nt.params { - Some(ref p) => serde_json::from_value(p.clone())?, - None => return Err(Error::InvalidMsg("Invalid notification msg")), - }; - - match selfc - .subscriptions - .lock() - .await - .get(&sub_result.subscription) - { - Some(s) => { - s.send(sub_result.result.unwrap_or(json!(""))).await?; - continue; - } - None => { - warn!("Receive unknown notification {}", sub_result.subscription); - continue; - } - } - } - - error!("Receive unexpected msg: {msg}"); - return Err(Error::InvalidMsg("Unexpected msg")); + selfc.handle_msg(msg).await?; } }, on_failure, ); } -} - -/// Builder for constructing an RPC [`Client`]. -pub struct ClientBuilder { - endpoint: Endpoint, - tls_config: Option<(rustls::ClientConfig, String)>, - tcp_config: TcpConfig, - timeout: Option, -} - -impl ClientBuilder { - /// Set timeout for sending and receiving messages, in milliseconds. - /// - /// # Examples - /// - /// ```ignore - /// let client = Client::builder()?.set_timeout(5000).build()?; - /// ``` - pub fn set_timeout(mut self, timeout: u64) -> Self { - self.timeout = Some(timeout); - self - } - /// Configure TCP settings for the client. - /// - /// # Example - /// - /// ```ignore - /// let tcp_config = TcpConfig::default(); - /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TCP protocols. - pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; - Ok(self) + async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> { + if let Ok(res) = serde_json::from_value::(msg.clone()) { + debug!("<-- {res}"); + if res.id.is_none() { + return Err(Error::InvalidMsg("Response id is none")); } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - /// Configure TLS settings for the client. - /// - /// # Example - /// - /// ```ignore - /// let tls_config = rustls::ClientConfig::new(...); - /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TLS protocols. - pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tls_config = Some((config, dns_name.to_string())); - Ok(self) + let id: u32 = serde_json::from_value(res.id.clone().unwrap())?; + match self.chans.lock().await.remove(&id) { + Some(tx) => tx.send(res).await?, + None => return Err(Error::InvalidMsg("Receive unkown message")), } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + + return Ok(()); } - } - /// Build RPC client from [`ClientBuilder`]. - /// - /// This function creates a new RPC client using the configurations - /// specified in the `ClientBuilder`. It returns a `Arc` on success. - /// - /// # Example - /// - /// ```ignore - /// let client = Client::builder(endpoint)? - /// .set_timeout(5000) - /// .tcp_config(tcp_config)? - /// .tls_config(tls_config, "example.com")? - /// .build() - /// .await?; - /// ``` - pub async fn build(self) -> Result> { - let conn: Conn = match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::tls::dial( - &self.endpoint, - ClientTlsConfig { - dns_name, - client_config: conf, - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::ws::dial( - &self.endpoint, - ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(ClientWssConfig { - dns_name, - client_config: conf, - }), - }, - WsJsonCodec {}, - ) - .await?, - ), + if let Ok(nt) = serde_json::from_value::(msg.clone()) { + debug!("<-- {nt}"); + let sub_result: message::NotificationResult = match nt.params { + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), + }; + + match self + .subscriptions + .lock() + .await + .get(&sub_result.subscription) + { + Some(s) => { + s.send(sub_result.result.unwrap_or(json!(""))).await?; + return Ok(()); + } None => { - let config = ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?) + warn!("Receive unknown notification {}", sub_result.subscription); + return Ok(()); } - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new( - karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, - ), - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; + } + } - let (tx, rx) = async_channel::bounded(CHANNEL_CAP); - let client = Arc::new(Client { - timeout: self.timeout, - conn, - chan_tx: tx, - chan_rx: rx, - subscriptions: Mutex::new(HashMap::new()), - task_group: TaskGroup::new(), - }); - client.start_background_receiving(); - Ok(client) - } -} -impl Client { - /// Creates a new [`ClientBuilder`] - /// - /// This function initializes a `ClientBuilder` with the specified endpoint. - /// - /// # Example - /// - /// ```ignore - /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?; - /// ``` - pub fn builder(endpoint: impl ToEndpoint) -> Result { - let endpoint = endpoint.to_endpoint()?; - Ok(ClientBuilder { - endpoint, - timeout: Some(DEFAULT_TIMEOUT), - tls_config: None, - tcp_config: Default::default(), - }) + error!("Receive unexpected msg: {msg}"); + Err(Error::InvalidMsg("Unexpected msg")) } } diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 5577455..14840fa 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -6,13 +6,14 @@ mod error; pub mod message; mod server; -pub use client::{Client, ClientBuilder}; +pub use client::{builder::ClientBuilder, Client}; pub use error::{Error, Result}; pub use server::{ + builder::ServerBuilder, channel::{ArcChannel, Channel, Subscription, SubscriptionID}, pubsub_service::{PubSubRPCMethod, PubSubRPCService}, service::{RPCMethod, RPCService}, - Server, ServerBuilder, + Server, }; pub use karyon_jsonrpc_macro::{rpc_impl, rpc_pubsub_impl}; diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs new file mode 100644 index 0000000..90024f3 --- /dev/null +++ b/jsonrpc/src/server/builder.rs @@ -0,0 +1,173 @@ +use std::{collections::HashMap, sync::Arc}; + +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use karyon_core::{async_runtime::Executor, async_util::TaskGroup}; +use karyon_net::{Endpoint, Listener, ToEndpoint}; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; + +#[cfg(feature = "ws")] +use karyon_net::ws::ServerWsConfig; + +use crate::{codec::JsonCodec, Error, PubSubRPCService, RPCService, Result, TcpConfig}; + +use super::Server; + +/// Builder for constructing an RPC [`Server`]. +pub struct ServerBuilder { + endpoint: Endpoint, + tcp_config: TcpConfig, + tls_config: Option, + services: HashMap>, + pubsub_services: HashMap>, +} + +impl ServerBuilder { + /// Adds a new RPC service to the server. + pub fn service(mut self, service: Arc) -> Self { + self.services.insert(service.name(), service); + self + } + + /// Adds a new PubSub RPC service to the server. + pub fn pubsub_service(mut self, service: Arc) -> Self { + self.pubsub_services.insert(service.name(), service); + self + } + + /// Configure TCP settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tcp_config = TcpConfig::default(); + /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TCP protocols. + pub fn tcp_config(mut self, config: TcpConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tcp_config = config; + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Configure TLS settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tls_config = rustls::ServerConfig::new(...); + /// let server = Server::builder()?.tls_config(tls_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TLS protocols. + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Builds the server with the configured options. + pub async fn build(self) -> Result> { + self._build(TaskGroup::new()).await + } + + /// Builds the server with the configured options and an executor. + pub async fn build_with_executor(self, ex: Executor) -> Result> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result> { + let listener: Listener = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: self.tcp_config, + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, + ), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => { + let config = ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: None, + }; + Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) + } + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + pubsub_services: self.pubsub_services, + })) + } +} + +impl Server { + /// Creates a new [`ServerBuilder`] + /// + /// This function initializes a `ServerBuilder` with the specified endpoint. + /// + /// # Example + /// + /// ```ignore + /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + pubsub_services: HashMap::new(), + tcp_config: Default::default(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index e1805e1..7f28de2 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -1,3 +1,4 @@ +pub mod builder; pub mod channel; pub mod pubsub_service; pub mod service; @@ -6,25 +7,10 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -#[cfg(feature = "smol")] -use futures_rustls::rustls; -#[cfg(feature = "tokio")] -use tokio_rustls::rustls; +use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_net::{Conn, Endpoint, Listener}; -use karyon_core::{ - async_runtime::Executor, - async_util::{select, Either, TaskGroup, TaskResult}, -}; - -use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - -#[cfg(feature = "ws")] -use karyon_net::ws::ServerWsConfig; - -use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result, TcpConfig}; +use crate::{message, Error, PubSubRPCService, RPCService, Result}; use channel::{ArcChannel, Channel}; @@ -302,157 +288,3 @@ impl Server { } } } - -/// Builder for constructing an RPC [`Server`]. -pub struct ServerBuilder { - endpoint: Endpoint, - tcp_config: TcpConfig, - tls_config: Option, - services: HashMap>, - pubsub_services: HashMap>, -} - -impl ServerBuilder { - /// Adds a new RPC service to the server. - pub fn service(mut self, service: Arc) -> Self { - self.services.insert(service.name(), service); - self - } - - /// Adds a new PubSub RPC service to the server. - pub fn pubsub_service(mut self, service: Arc) -> Self { - self.pubsub_services.insert(service.name(), service); - self - } - - /// Configure TCP settings for the server. - /// - /// # Example - /// - /// ```ignore - /// let tcp_config = TcpConfig::default(); - /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TCP protocols. - pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - /// Configure TLS settings for the server. - /// - /// # Example - /// - /// ```ignore - /// let tls_config = rustls::ServerConfig::new(...); - /// let server = Server::builder()?.tls_config(tls_config)?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TLS protocols. - pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tls_config = Some(config); - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - /// Builds the server with the configured options. - pub async fn build(self) -> Result> { - self._build(TaskGroup::new()).await - } - - /// Builds the server with the configured options and an executor. - pub async fn build_with_executor(self, ex: Executor) -> Result> { - self._build(TaskGroup::with_executor(ex)).await - } - - async fn _build(self, task_group: TaskGroup) -> Result> { - let listener: Listener = match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::tls::listen( - &self.endpoint, - karyon_net::tls::ServerTlsConfig { - server_config: conf.clone(), - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::ws::listen( - &self.endpoint, - ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(karyon_net::ws::ServerWssConfig { - server_config: conf.clone(), - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => { - let config = ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) - } - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( - &self.endpoint, - Default::default(), - JsonCodec {}, - )?), - - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - Ok(Arc::new(Server { - listener, - task_group, - services: self.services, - pubsub_services: self.pubsub_services, - })) - } -} - -impl Server { - /// Creates a new [`ServerBuilder`] - /// - /// This function initializes a `ServerBuilder` with the specified endpoint. - /// - /// # Example - /// - /// ```ignore - /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; - /// ``` - pub fn builder(endpoint: impl ToEndpoint) -> Result { - let endpoint = endpoint.to_endpoint()?; - Ok(ServerBuilder { - endpoint, - services: HashMap::new(), - pubsub_services: HashMap::new(), - tcp_config: Default::default(), - tls_config: None, - }) - } -} -- cgit v1.2.3