From 6c793e7ed3f3736e2169976f11e304f288ca6813 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 22 Jun 2024 15:33:24 +0200 Subject: jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in `ServerBuilder` and `ClientBuilder` --- jsonrpc/src/client/builder.rs | 131 ++++++++---------------------------- jsonrpc/src/client/mod.rs | 119 +++++++++++++++++++++++++++++--- jsonrpc/src/client/subscriptions.rs | 2 +- jsonrpc/src/error.rs | 3 + jsonrpc/src/server/builder.rs | 128 ++++++++--------------------------- jsonrpc/src/server/mod.rs | 108 +++++++++++++++++++++++++++-- 6 files changed, 272 insertions(+), 219 deletions(-) diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs index 5a7936c..d1e3b67 100644 --- a/jsonrpc/src/client/builder.rs +++ b/jsonrpc/src/client/builder.rs @@ -1,26 +1,17 @@ -use std::sync::{atomic::AtomicBool, Arc}; +use std::sync::Arc; -use karyon_core::async_util::TaskGroup; -use karyon_net::{Conn, Endpoint, ToEndpoint}; +#[cfg(feature = "tcp")] +use karyon_net::Endpoint; +use karyon_net::ToEndpoint; #[cfg(feature = "tls")] -use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig}; - -#[cfg(feature = "ws")] -use karyon_net::ws::ClientWsConfig; - -#[cfg(all(feature = "ws", feature = "tls"))] -use karyon_net::ws::ClientWssConfig; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; +use karyon_net::async_rustls::rustls; +use crate::Result; #[cfg(feature = "tcp")] -use crate::TcpConfig; +use crate::{Error, TcpConfig}; -use crate::{codec::JsonCodec, Error, Result}; - -use super::{Client, MessageDispatcher, Subscriptions}; +use super::{Client, ClientConfig}; const DEFAULT_TIMEOUT: u64 = 3000; // 3s @@ -44,26 +35,22 @@ impl Client { pub fn builder(endpoint: impl ToEndpoint) -> Result { let endpoint = endpoint.to_endpoint()?; Ok(ClientBuilder { - endpoint, - timeout: Some(DEFAULT_TIMEOUT), - #[cfg(feature = "tcp")] - tcp_config: Default::default(), - #[cfg(feature = "tls")] - tls_config: None, - subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE, + inner: ClientConfig { + endpoint, + timeout: Some(DEFAULT_TIMEOUT), + #[cfg(feature = "tcp")] + tcp_config: Default::default(), + #[cfg(feature = "tls")] + tls_config: None, + subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE, + }, }) } } /// Builder for constructing an RPC [`Client`]. pub struct ClientBuilder { - endpoint: Endpoint, - #[cfg(feature = "tcp")] - tcp_config: TcpConfig, - #[cfg(feature = "tls")] - tls_config: Option<(rustls::ClientConfig, String)>, - timeout: Option, - subscription_buffer_size: usize, + inner: ClientConfig, } impl ClientBuilder { @@ -82,7 +69,7 @@ impl ClientBuilder { /// }; /// ``` pub fn set_timeout(mut self, timeout: u64) -> Self { - self.timeout = Some(timeout); + self.inner.timeout = Some(timeout); self } @@ -106,7 +93,7 @@ impl ClientBuilder { /// }; /// ``` pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self { - self.subscription_buffer_size = size; + self.inner.subscription_buffer_size = size; self } @@ -128,12 +115,12 @@ impl ClientBuilder { /// This function will return an error if the endpoint does not support TCP protocols. #[cfg(feature = "tcp")] pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; + self.inner.tcp_config = config; Ok(self) } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())), } } @@ -157,14 +144,14 @@ impl ClientBuilder { /// This function will return an error if the endpoint does not support TLS protocols. #[cfg(feature = "tls")] pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tls(..) | Endpoint::Wss(..) => { - self.tls_config = Some((config, dns_name.to_string())); + self.inner.tls_config = Some((config, dns_name.to_string())); Ok(self) } _ => Err(Error::UnsupportedProtocol(format!( "Invalid tls config for endpoint: {}", - self.endpoint + self.inner.endpoint ))), } } @@ -189,71 +176,7 @@ impl ClientBuilder { /// /// ``` pub async fn build(self) -> Result> { - let conn: Conn = match self.endpoint { - #[cfg(feature = "tcp")] - Endpoint::Tcp(..) => Box::new( - karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - #[cfg(feature = "tls")] - Endpoint::Tls(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::tls::dial( - &self.endpoint, - ClientTlsConfig { - dns_name, - client_config: conf, - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) => { - let config = ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?) - } - #[cfg(all(feature = "ws", feature = "tls"))] - Endpoint::Wss(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::ws::dial( - &self.endpoint, - ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(ClientWssConfig { - dns_name, - client_config: conf, - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new( - karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, - ), - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - let send_chan = async_channel::bounded(10); - - let client = Arc::new(Client { - timeout: self.timeout, - disconnect: AtomicBool::new(false), - send_chan, - message_dispatcher: MessageDispatcher::new(), - subscriptions: Subscriptions::new(self.subscription_buffer_size), - task_group: TaskGroup::new(), - }); - client.start_background_loop(conn); + let client = Client::init(self.inner).await?; Ok(client) } } diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 80125b1..51f0233 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -15,13 +15,26 @@ use log::{debug, error}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; +#[cfg(feature = "tcp")] +use karyon_net::tcp::TcpConfig; +#[cfg(feature = "ws")] +use karyon_net::ws::ClientWsConfig; +#[cfg(all(feature = "ws", feature = "tls"))] +use karyon_net::ws::ClientWssConfig; +#[cfg(feature = "tls")] +use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig}; +use karyon_net::{Conn, Endpoint}; + use karyon_core::{ async_util::{select, timeout, Either, TaskGroup, TaskResult}, util::random_32, }; -use karyon_net::Conn; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; use crate::{ + codec::JsonCodec, message::{self, SubscriptionID}, Error, Result, }; @@ -32,14 +45,24 @@ use subscriptions::Subscriptions; type RequestID = u32; +struct ClientConfig { + endpoint: Endpoint, + #[cfg(feature = "tcp")] + tcp_config: TcpConfig, + #[cfg(feature = "tls")] + tls_config: Option<(rustls::ClientConfig, String)>, + timeout: Option, + subscription_buffer_size: usize, +} + /// Represents an RPC client pub struct Client { - timeout: Option, disconnect: AtomicBool, message_dispatcher: MessageDispatcher, - task_group: TaskGroup, - send_chan: (Sender, Receiver), subscriptions: Arc, + send_chan: (Sender, Receiver), + task_group: TaskGroup, + config: ClientConfig, } #[derive(Serialize, Deserialize)] @@ -96,6 +119,11 @@ impl Client { Ok(()) } + /// Disconnect the client + pub async fn stop(&self) { + self.task_group.cancel().await; + } + async fn send_request( &self, method: &str, @@ -116,7 +144,7 @@ impl Client { let rx = self.message_dispatcher.register(id).await; // Wait for the message dispatcher to send the response - let result = match self.timeout { + let result = match self.config.timeout { Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?, None => rx.recv().await, }; @@ -152,11 +180,86 @@ impl Client { Ok(()) } + async fn init(config: ClientConfig) -> Result> { + let client = Arc::new(Client { + disconnect: AtomicBool::new(false), + subscriptions: Subscriptions::new(config.subscription_buffer_size), + send_chan: async_channel::bounded(10), + message_dispatcher: MessageDispatcher::new(), + task_group: TaskGroup::new(), + config, + }); + + let conn = client.connect().await?; + client.start_background_loop(conn); + Ok(client) + } + + async fn connect(self: &Arc) -> Result> { + let endpoint = self.config.endpoint.clone(); + let conn: Conn = match endpoint { + #[cfg(feature = "tcp")] + Endpoint::Tcp(..) => Box::new( + karyon_net::tcp::dial(&endpoint, self.config.tcp_config.clone(), JsonCodec {}) + .await?, + ), + #[cfg(feature = "tls")] + Endpoint::Tls(..) => match &self.config.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.config.endpoint, + ClientTlsConfig { + dns_name: dns_name.to_string(), + client_config: conf.clone(), + tcp_config: self.config.tcp_config.clone(), + }, + JsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) => { + let config = ClientWsConfig { + tcp_config: self.config.tcp_config.clone(), + wss_config: None, + }; + Box::new(karyon_net::ws::dial(&endpoint, config, WsJsonCodec {}).await?) + } + #[cfg(all(feature = "ws", feature = "tls"))] + Endpoint::Wss(..) => match &self.config.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &endpoint, + ClientWsConfig { + tcp_config: self.config.tcp_config.clone(), + wss_config: Some(ClientWssConfig { + dns_name: dns_name.clone(), + client_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => { + Box::new(karyon_net::unix::dial(&endpoint, Default::default(), JsonCodec {}).await?) + } + _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())), + }; + + Ok(conn) + } + fn start_background_loop(self: &Arc, conn: Conn) { let selfc = self.clone(); let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { - error!("Background loop stopped: {err}"); + error!("Client stopped: {err}"); } selfc.disconnect.store(true, Ordering::Relaxed); selfc.subscriptions.clear().await; @@ -201,8 +304,8 @@ impl Client { self.subscriptions.notify(nt).await } }, - Err(_) => { - error!("Receive unexpected msg: {msg}"); + Err(err) => { + error!("Receive unexpected msg {msg}: {err}"); Err(Error::InvalidMsg("Unexpected msg")) } } diff --git a/jsonrpc/src/client/subscriptions.rs b/jsonrpc/src/client/subscriptions.rs index f3d8cb2..fe66f96 100644 --- a/jsonrpc/src/client/subscriptions.rs +++ b/jsonrpc/src/client/subscriptions.rs @@ -25,7 +25,7 @@ impl Subscription { } pub async fn recv(&self) -> Result { - self.rx.recv().await.map_err(Error::from) + self.rx.recv().await.map_err(|_| Error::SubscriptionClosed) } pub fn id(&self) -> SubscriptionID { diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 7083e8a..1b38519 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -35,6 +35,9 @@ pub enum Error { #[error("Subscription exceeds the maximum buffer size")] SubscriptionBufferFull, + #[error("Subscription closed")] + SubscriptionClosed, + #[error("ClientDisconnected")] ClientDisconnected, diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs index cd79ac3..01daf8e 100644 --- a/jsonrpc/src/server/builder.rs +++ b/jsonrpc/src/server/builder.rs @@ -1,33 +1,23 @@ use std::{collections::HashMap, sync::Arc}; -use karyon_core::{async_runtime::Executor, async_util::TaskGroup}; -use karyon_net::{Endpoint, Listener, ToEndpoint}; +use karyon_core::async_runtime::Executor; + +#[cfg(feature = "tcp")] +use karyon_net::Endpoint; +use karyon_net::ToEndpoint; #[cfg(feature = "tls")] use karyon_net::async_rustls::rustls; -#[cfg(feature = "ws")] -use karyon_net::ws::ServerWsConfig; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - #[cfg(feature = "tcp")] -use crate::TcpConfig; +use crate::{Error, TcpConfig}; +use crate::{PubSubRPCService, RPCService, Result}; -use crate::{codec::JsonCodec, Error, PubSubRPCService, RPCService, Result}; - -use super::Server; +use super::{Server, ServerConfig}; /// Builder for constructing an RPC [`Server`]. pub struct ServerBuilder { - endpoint: Endpoint, - #[cfg(feature = "tcp")] - tcp_config: TcpConfig, - #[cfg(feature = "tls")] - tls_config: Option, - services: HashMap>, - pubsub_services: HashMap>, + inner: ServerConfig, } impl ServerBuilder { @@ -58,7 +48,7 @@ impl ServerBuilder { /// /// ``` pub fn service(mut self, service: Arc) -> Self { - self.services.insert(service.name(), service); + self.inner.services.insert(service.name(), service); self } @@ -118,7 +108,7 @@ impl ServerBuilder { /// /// ``` pub fn pubsub_service(mut self, service: Arc) -> Self { - self.pubsub_services.insert(service.name(), service); + self.inner.pubsub_services.insert(service.name(), service); self } @@ -140,12 +130,12 @@ impl ServerBuilder { /// This function will return an error if the endpoint does not support TCP protocols. #[cfg(feature = "tcp")] pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; + self.inner.tcp_config = config; Ok(self) } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())), } } @@ -168,90 +158,26 @@ impl ServerBuilder { /// This function will return an error if the endpoint does not support TLS protocols. #[cfg(feature = "tls")] pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tls(..) | Endpoint::Wss(..) => { - self.tls_config = Some(config); + self.inner.tls_config = Some(config); Ok(self) } _ => Err(Error::UnsupportedProtocol(format!( "Invalid tls config for endpoint: {}", - self.endpoint + self.inner.endpoint ))), } } /// Builds the server with the configured options. pub async fn build(self) -> Result> { - self._build(TaskGroup::new()).await + Server::init(self.inner, None).await } /// Builds the server with the configured options and an executor. pub async fn build_with_executor(self, ex: Executor) -> Result> { - self._build(TaskGroup::with_executor(ex)).await - } - - async fn _build(self, task_group: TaskGroup) -> Result> { - let listener: Listener = match self.endpoint { - #[cfg(feature = "tcp")] - Endpoint::Tcp(..) => Box::new( - karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - #[cfg(feature = "tls")] - Endpoint::Tls(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::tls::listen( - &self.endpoint, - karyon_net::tls::ServerTlsConfig { - server_config: conf.clone(), - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) => { - let config = ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) - } - #[cfg(all(feature = "ws", feature = "tls"))] - Endpoint::Wss(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::ws::listen( - &self.endpoint, - ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(karyon_net::ws::ServerWssConfig { - server_config: conf.clone(), - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( - &self.endpoint, - Default::default(), - JsonCodec {}, - )?), - - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - Ok(Arc::new(Server { - listener, - task_group, - services: self.services, - pubsub_services: self.pubsub_services, - })) + Server::init(self.inner, Some(ex)).await } } @@ -272,13 +198,15 @@ impl Server { pub fn builder(endpoint: impl ToEndpoint) -> Result { let endpoint = endpoint.to_endpoint()?; Ok(ServerBuilder { - endpoint, - services: HashMap::new(), - pubsub_services: HashMap::new(), - #[cfg(feature = "tcp")] - tcp_config: Default::default(), - #[cfg(feature = "tls")] - tls_config: None, + inner: ServerConfig { + endpoint, + services: HashMap::new(), + pubsub_services: HashMap::new(), + #[cfg(feature = "tcp")] + tcp_config: Default::default(), + #[cfg(feature = "tls")] + tls_config: None, + }, }) } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 00b0fd2..ddebeb9 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -8,10 +8,22 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ + async_runtime::Executor, + async_util::{select, Either, TaskGroup, TaskResult}, +}; + +#[cfg(feature = "tls")] +use karyon_net::async_rustls::rustls; +#[cfg(feature = "tcp")] +use karyon_net::tcp::TcpConfig; +#[cfg(feature = "ws")] +use karyon_net::ws::ServerWsConfig; use karyon_net::{Conn, Endpoint, Listener}; -use crate::{message, Error, PubSubRPCService, RPCService, Result}; +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; +use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result}; use channel::Channel; use response_queue::ResponseQueue; @@ -32,12 +44,21 @@ enum SanityCheckResult { ErrRes(message::Response), } +struct ServerConfig { + endpoint: Endpoint, + #[cfg(feature = "tcp")] + tcp_config: TcpConfig, + #[cfg(feature = "tls")] + tls_config: Option, + services: HashMap>, + pubsub_services: HashMap>, +} + /// Represents an RPC server pub struct Server { listener: Listener, task_group: TaskGroup, - services: HashMap>, - pubsub_services: HashMap>, + config: ServerConfig, } impl Server { @@ -265,7 +286,7 @@ impl Server { }; // Check if the service exists in pubsub services list - if let Some(service) = self.pubsub_services.get(&req.srvc_name) { + if let Some(service) = self.config.pubsub_services.get(&req.srvc_name) { // Check if the method exists within the service if let Some(method) = service.get_pubsub_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -279,7 +300,7 @@ impl Server { } // Check if the service exists in services list - if let Some(service) = self.services.get(&req.srvc_name) { + if let Some(service) = self.config.services.get(&req.srvc_name) { // Check if the method exists within the service if let Some(method) = service.get_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -300,4 +321,79 @@ impl Server { response } + + async fn init(config: ServerConfig, ex: Option) -> Result> { + let task_group = match ex { + Some(ex) => TaskGroup::with_executor(ex), + None => TaskGroup::new(), + }; + let listener = Self::listen(&config).await?; + let server = Arc::new(Server { + listener, + task_group, + config, + }); + + Ok(server) + } + + async fn listen(config: &ServerConfig) -> Result> { + let endpoint = config.endpoint.clone(); + let listener: Listener = match endpoint { + #[cfg(feature = "tcp")] + Endpoint::Tcp(..) => Box::new( + karyon_net::tcp::listen(&endpoint, config.tcp_config.clone(), JsonCodec {}).await?, + ), + #[cfg(feature = "tls")] + Endpoint::Tls(..) => match &config.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: config.tcp_config.clone(), + }, + JsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) => { + let config = ServerWsConfig { + tcp_config: config.tcp_config.clone(), + wss_config: None, + }; + Box::new(karyon_net::ws::listen(&endpoint, config, WsJsonCodec {}).await?) + } + #[cfg(all(feature = "ws", feature = "tls"))] + Endpoint::Wss(..) => match &config.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &endpoint, + ServerWsConfig { + tcp_config: config.tcp_config.clone(), + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())), + }; + + Ok(listener) + } } -- cgit v1.2.3