From 6c793e7ed3f3736e2169976f11e304f288ca6813 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 22 Jun 2024 15:33:24 +0200 Subject: jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in `ServerBuilder` and `ClientBuilder` --- jsonrpc/src/client/builder.rs | 131 +++++++++--------------------------------- 1 file changed, 27 insertions(+), 104 deletions(-) (limited to 'jsonrpc/src/client/builder.rs') diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs index 5a7936c..d1e3b67 100644 --- a/jsonrpc/src/client/builder.rs +++ b/jsonrpc/src/client/builder.rs @@ -1,26 +1,17 @@ -use std::sync::{atomic::AtomicBool, Arc}; +use std::sync::Arc; -use karyon_core::async_util::TaskGroup; -use karyon_net::{Conn, Endpoint, ToEndpoint}; +#[cfg(feature = "tcp")] +use karyon_net::Endpoint; +use karyon_net::ToEndpoint; #[cfg(feature = "tls")] -use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig}; - -#[cfg(feature = "ws")] -use karyon_net::ws::ClientWsConfig; - -#[cfg(all(feature = "ws", feature = "tls"))] -use karyon_net::ws::ClientWssConfig; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; +use karyon_net::async_rustls::rustls; +use crate::Result; #[cfg(feature = "tcp")] -use crate::TcpConfig; +use crate::{Error, TcpConfig}; -use crate::{codec::JsonCodec, Error, Result}; - -use super::{Client, MessageDispatcher, Subscriptions}; +use super::{Client, ClientConfig}; const DEFAULT_TIMEOUT: u64 = 3000; // 3s @@ -44,26 +35,22 @@ impl Client { pub fn builder(endpoint: impl ToEndpoint) -> Result { let endpoint = endpoint.to_endpoint()?; Ok(ClientBuilder { - endpoint, - timeout: Some(DEFAULT_TIMEOUT), - #[cfg(feature = "tcp")] - tcp_config: Default::default(), - #[cfg(feature = "tls")] - tls_config: None, - subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE, + inner: ClientConfig { + endpoint, + timeout: Some(DEFAULT_TIMEOUT), + #[cfg(feature = "tcp")] + tcp_config: Default::default(), + #[cfg(feature = "tls")] + tls_config: None, + subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE, + }, }) } } /// Builder for constructing an RPC [`Client`]. pub struct ClientBuilder { - endpoint: Endpoint, - #[cfg(feature = "tcp")] - tcp_config: TcpConfig, - #[cfg(feature = "tls")] - tls_config: Option<(rustls::ClientConfig, String)>, - timeout: Option, - subscription_buffer_size: usize, + inner: ClientConfig, } impl ClientBuilder { @@ -82,7 +69,7 @@ impl ClientBuilder { /// }; /// ``` pub fn set_timeout(mut self, timeout: u64) -> Self { - self.timeout = Some(timeout); + self.inner.timeout = Some(timeout); self } @@ -106,7 +93,7 @@ impl ClientBuilder { /// }; /// ``` pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self { - self.subscription_buffer_size = size; + self.inner.subscription_buffer_size = size; self } @@ -128,12 +115,12 @@ impl ClientBuilder { /// This function will return an error if the endpoint does not support TCP protocols. #[cfg(feature = "tcp")] pub fn tcp_config(mut self, config: TcpConfig) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tcp_config = config; + self.inner.tcp_config = config; Ok(self) } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())), } } @@ -157,14 +144,14 @@ impl ClientBuilder { /// This function will return an error if the endpoint does not support TLS protocols. #[cfg(feature = "tls")] pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { - match self.endpoint { + match self.inner.endpoint { Endpoint::Tls(..) | Endpoint::Wss(..) => { - self.tls_config = Some((config, dns_name.to_string())); + self.inner.tls_config = Some((config, dns_name.to_string())); Ok(self) } _ => Err(Error::UnsupportedProtocol(format!( "Invalid tls config for endpoint: {}", - self.endpoint + self.inner.endpoint ))), } } @@ -189,71 +176,7 @@ impl ClientBuilder { /// /// ``` pub async fn build(self) -> Result> { - let conn: Conn = match self.endpoint { - #[cfg(feature = "tcp")] - Endpoint::Tcp(..) => Box::new( - karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?, - ), - #[cfg(feature = "tls")] - Endpoint::Tls(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::tls::dial( - &self.endpoint, - ClientTlsConfig { - dns_name, - client_config: conf, - tcp_config: self.tcp_config, - }, - JsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) => { - let config = ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: None, - }; - Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?) - } - #[cfg(all(feature = "ws", feature = "tls"))] - Endpoint::Wss(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::ws::dial( - &self.endpoint, - ClientWsConfig { - tcp_config: self.tcp_config, - wss_config: Some(ClientWssConfig { - dns_name, - client_config: conf, - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => return Err(Error::TLSConfigRequired), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new( - karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, - ), - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - let send_chan = async_channel::bounded(10); - - let client = Arc::new(Client { - timeout: self.timeout, - disconnect: AtomicBool::new(false), - send_chan, - message_dispatcher: MessageDispatcher::new(), - subscriptions: Subscriptions::new(self.subscription_buffer_size), - task_group: TaskGroup::new(), - }); - client.start_background_loop(conn); + let client = Client::init(self.inner).await?; Ok(client) } } -- cgit v1.2.3