diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-22 15:33:24 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-22 15:33:24 +0200 |
commit | 6c793e7ed3f3736e2169976f11e304f288ca6813 (patch) | |
tree | 5ea269f9b0a3148e6fc206ef166900309a0d43dc /jsonrpc/src/server | |
parent | 0a2c0dbc6c1afd56e9db0d93eef1ae05fe81a30b (diff) |
jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in
`ServerBuilder` and `ClientBuilder`
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r-- | jsonrpc/src/server/builder.rs | 128 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 108 |
2 files changed, 130 insertions, 106 deletions
diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs index cd79ac3..01daf8e 100644 --- a/jsonrpc/src/server/builder.rs +++ b/jsonrpc/src/server/builder.rs @@ -1,33 +1,23 @@ use std::{collections::HashMap, sync::Arc}; -use karyon_core::{async_runtime::Executor, async_util::TaskGroup}; -use karyon_net::{Endpoint, Listener, ToEndpoint}; +use karyon_core::async_runtime::Executor; + +#[cfg(feature = "tcp")] +use karyon_net::Endpoint; +use karyon_net::ToEndpoint; #[cfg(feature = "tls")] use karyon_net::async_rustls::rustls; -#[cfg(feature = "ws")] -use karyon_net::ws::ServerWsConfig; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - #[cfg(feature = "tcp")] -use crate::TcpConfig; +use crate::{Error, TcpConfig}; +use crate::{PubSubRPCService, RPCService, Result}; -use crate::{codec::JsonCodec, Error, PubSubRPCService, RPCService, Result}; - -use super::Server; +use super::{Server, ServerConfig}; /// Builder for constructing an RPC [`Server`]. pub struct ServerBuilder { - endpoint: Endpoint, - #[cfg(feature = "tcp")] - tcp_config: TcpConfig, - #[cfg(feature = "tls")] - tls_config: Option<rustls::ServerConfig>, - services: HashMap<String, Arc<dyn RPCService + 'static>>, - pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>, + inner: ServerConfig, } impl ServerBuilder { @@ -58,7 +48,7 @@ impl ServerBuilder { /// /// ``` pub fn service(mut self, service: Arc<dyn RPCService>) -> Self { - self.services.insert(service.name(), service); + self.inner.services.insert(service.name(), service); self } @@ -118,7 +108,7 @@ impl ServerBuilder { /// /// ``` pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self { - self.pubsub_services.insert(service.name(), service); + self.inner.pubsub_services.insert(service.name(), service); self } @@ -140,12 +130,12 @@ impl ServerBuilder { /// This function will return an error if the endpoint does not support TCP protocols. #[cfg(feature = "tcp")] pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder> { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; + self.inner.tcp_config = config; Ok(self) } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())), } } @@ -168,90 +158,26 @@ impl ServerBuilder { /// This function will return an error if the endpoint does not support TLS protocols. #[cfg(feature = "tls")] pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tls(..) | Endpoint::Wss(..) => { - self.tls_config = Some(config); + self.inner.tls_config = Some(config); Ok(self) } _ => Err(Error::UnsupportedProtocol(format!( "Invalid tls config for endpoint: {}", - self.endpoint + self.inner.endpoint ))), } } /// Builds the server with the configured options. pub async fn build(self) -> Result<Arc<Server>> { - self._build(TaskGroup::new()).await + Server::init(self.inner, None).await } /// Builds the server with the configured options and an executor. pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> { - self._build(TaskGroup::with_executor(ex)).await - } - - async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> { - let listener: Listener<serde_json::Value> = match self.endpoint { - #[cfg(feature = "tcp")] - Endpoint::Tcp(..) => Box::new( - karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - #[cfg(feature = "tls")] - Endpoint::Tls(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::tls::listen( - &self.endpoint, - karyon_net::tls::ServerTlsConfig { - server_config: conf.clone(), - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) => { - let config = ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) - } - #[cfg(all(feature = "ws", feature = "tls"))] - Endpoint::Wss(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::ws::listen( - &self.endpoint, - ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(karyon_net::ws::ServerWssConfig { - server_config: conf.clone(), - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( - &self.endpoint, - Default::default(), - JsonCodec {}, - )?), - - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - Ok(Arc::new(Server { - listener, - task_group, - services: self.services, - pubsub_services: self.pubsub_services, - })) + Server::init(self.inner, Some(ex)).await } } @@ -272,13 +198,15 @@ impl Server { pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> { let endpoint = endpoint.to_endpoint()?; Ok(ServerBuilder { - endpoint, - services: HashMap::new(), - pubsub_services: HashMap::new(), - #[cfg(feature = "tcp")] - tcp_config: Default::default(), - #[cfg(feature = "tls")] - tls_config: None, + inner: ServerConfig { + endpoint, + services: HashMap::new(), + pubsub_services: HashMap::new(), + #[cfg(feature = "tcp")] + tcp_config: Default::default(), + #[cfg(feature = "tls")] + tls_config: None, + }, }) } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 00b0fd2..ddebeb9 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -8,10 +8,22 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ + async_runtime::Executor, + async_util::{select, Either, TaskGroup, TaskResult}, +}; + +#[cfg(feature = "tls")] +use karyon_net::async_rustls::rustls; +#[cfg(feature = "tcp")] +use karyon_net::tcp::TcpConfig; +#[cfg(feature = "ws")] +use karyon_net::ws::ServerWsConfig; use karyon_net::{Conn, Endpoint, Listener}; -use crate::{message, Error, PubSubRPCService, RPCService, Result}; +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; +use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result}; use channel::Channel; use response_queue::ResponseQueue; @@ -32,12 +44,21 @@ enum SanityCheckResult { ErrRes(message::Response), } +struct ServerConfig { + endpoint: Endpoint, + #[cfg(feature = "tcp")] + tcp_config: TcpConfig, + #[cfg(feature = "tls")] + tls_config: Option<rustls::ServerConfig>, + services: HashMap<String, Arc<dyn RPCService + 'static>>, + pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>, +} + /// Represents an RPC server pub struct Server { listener: Listener<serde_json::Value>, task_group: TaskGroup, - services: HashMap<String, Arc<dyn RPCService + 'static>>, - pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>, + config: ServerConfig, } impl Server { @@ -265,7 +286,7 @@ impl Server { }; // Check if the service exists in pubsub services list - if let Some(service) = self.pubsub_services.get(&req.srvc_name) { + if let Some(service) = self.config.pubsub_services.get(&req.srvc_name) { // Check if the method exists within the service if let Some(method) = service.get_pubsub_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -279,7 +300,7 @@ impl Server { } // Check if the service exists in services list - if let Some(service) = self.services.get(&req.srvc_name) { + if let Some(service) = self.config.services.get(&req.srvc_name) { // Check if the method exists within the service if let Some(method) = service.get_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -300,4 +321,79 @@ impl Server { response } + + async fn init(config: ServerConfig, ex: Option<Executor>) -> Result<Arc<Self>> { + let task_group = match ex { + Some(ex) => TaskGroup::with_executor(ex), + None => TaskGroup::new(), + }; + let listener = Self::listen(&config).await?; + let server = Arc::new(Server { + listener, + task_group, + config, + }); + + Ok(server) + } + + async fn listen(config: &ServerConfig) -> Result<Listener<serde_json::Value>> { + let endpoint = config.endpoint.clone(); + let listener: Listener<serde_json::Value> = match endpoint { + #[cfg(feature = "tcp")] + Endpoint::Tcp(..) => Box::new( + karyon_net::tcp::listen(&endpoint, config.tcp_config.clone(), JsonCodec {}).await?, + ), + #[cfg(feature = "tls")] + Endpoint::Tls(..) => match &config.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: config.tcp_config.clone(), + }, + JsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) => { + let config = ServerWsConfig { + tcp_config: config.tcp_config.clone(), + wss_config: None, + }; + Box::new(karyon_net::ws::listen(&endpoint, config, WsJsonCodec {}).await?) + } + #[cfg(all(feature = "ws", feature = "tls"))] + Endpoint::Wss(..) => match &config.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &endpoint, + ServerWsConfig { + tcp_config: config.tcp_config.clone(), + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())), + }; + + Ok(listener) + } } |