From 8afb4d30750840f66d9f97c2c54a893d3934c45e Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 May 2024 00:19:10 +0200 Subject: jsonrpc: enable concurrent requests in `Client` --- jsonrpc/src/server/mod.rs | 176 ++-------------------------------------------- 1 file changed, 4 insertions(+), 172 deletions(-) (limited to 'jsonrpc/src/server/mod.rs') diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index e1805e1..7f28de2 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -1,3 +1,4 @@ +pub mod builder; pub mod channel; pub mod pubsub_service; pub mod service; @@ -6,25 +7,10 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -#[cfg(feature = "smol")] -use futures_rustls::rustls; -#[cfg(feature = "tokio")] -use tokio_rustls::rustls; +use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_net::{Conn, Endpoint, Listener}; -use karyon_core::{ - async_runtime::Executor, - async_util::{select, Either, TaskGroup, TaskResult}, -}; - -use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - -#[cfg(feature = "ws")] -use karyon_net::ws::ServerWsConfig; - -use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result, TcpConfig}; +use crate::{message, Error, PubSubRPCService, RPCService, Result}; use channel::{ArcChannel, Channel}; @@ -302,157 +288,3 @@ impl Server { } } } - -/// Builder for constructing an RPC [`Server`]. -pub struct ServerBuilder { - endpoint: Endpoint, - tcp_config: TcpConfig, - tls_config: Option, - services: HashMap>, - pubsub_services: HashMap>, -} - -impl ServerBuilder { - /// Adds a new RPC service to the server. - pub fn service(mut self, service: Arc) -> Self { - self.services.insert(service.name(), service); - self - } - - /// Adds a new PubSub RPC service to the server. - pub fn pubsub_service(mut self, service: Arc) -> Self { - self.pubsub_services.insert(service.name(), service); - self - } - - /// Configure TCP settings for the server. - /// - /// # Example - /// - /// ```ignore - /// let tcp_config = TcpConfig::default(); - /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TCP protocols. - pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - /// Configure TLS settings for the server. - /// - /// # Example - /// - /// ```ignore - /// let tls_config = rustls::ServerConfig::new(...); - /// let server = Server::builder()?.tls_config(tls_config)?.build()?; - /// ``` - /// - /// This function will return an error if the endpoint does not support TLS protocols. - pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tls_config = Some(config); - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - /// Builds the server with the configured options. - pub async fn build(self) -> Result> { - self._build(TaskGroup::new()).await - } - - /// Builds the server with the configured options and an executor. - pub async fn build_with_executor(self, ex: Executor) -> Result> { - self._build(TaskGroup::with_executor(ex)).await - } - - async fn _build(self, task_group: TaskGroup) -> Result> { - let listener: Listener = match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::tls::listen( - &self.endpoint, - karyon_net::tls::ServerTlsConfig { - server_config: conf.clone(), - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::ws::listen( - &self.endpoint, - ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(karyon_net::ws::ServerWssConfig { - server_config: conf.clone(), - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => { - let config = ServerWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) - } - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( - &self.endpoint, - Default::default(), - JsonCodec {}, - )?), - - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - Ok(Arc::new(Server { - listener, - task_group, - services: self.services, - pubsub_services: self.pubsub_services, - })) - } -} - -impl Server { - /// Creates a new [`ServerBuilder`] - /// - /// This function initializes a `ServerBuilder` with the specified endpoint. - /// - /// # Example - /// - /// ```ignore - /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; - /// ``` - pub fn builder(endpoint: impl ToEndpoint) -> Result { - let endpoint = endpoint.to_endpoint()?; - Ok(ServerBuilder { - endpoint, - services: HashMap::new(), - pubsub_services: HashMap::new(), - tcp_config: Default::default(), - tls_config: None, - }) - } -} -- cgit v1.2.3