From 6c793e7ed3f3736e2169976f11e304f288ca6813 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 22 Jun 2024 15:33:24 +0200 Subject: jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in `ServerBuilder` and `ClientBuilder` --- jsonrpc/src/client/mod.rs | 119 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 8 deletions(-) (limited to 'jsonrpc/src/client/mod.rs') diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 80125b1..51f0233 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -15,13 +15,26 @@ use log::{debug, error}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; +#[cfg(feature = "tcp")] +use karyon_net::tcp::TcpConfig; +#[cfg(feature = "ws")] +use karyon_net::ws::ClientWsConfig; +#[cfg(all(feature = "ws", feature = "tls"))] +use karyon_net::ws::ClientWssConfig; +#[cfg(feature = "tls")] +use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig}; +use karyon_net::{Conn, Endpoint}; + use karyon_core::{ async_util::{select, timeout, Either, TaskGroup, TaskResult}, util::random_32, }; -use karyon_net::Conn; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; use crate::{ + codec::JsonCodec, message::{self, SubscriptionID}, Error, Result, }; @@ -32,14 +45,24 @@ use subscriptions::Subscriptions; type RequestID = u32; +struct ClientConfig { + endpoint: Endpoint, + #[cfg(feature = "tcp")] + tcp_config: TcpConfig, + #[cfg(feature = "tls")] + tls_config: Option<(rustls::ClientConfig, String)>, + timeout: Option, + subscription_buffer_size: usize, +} + /// Represents an RPC client pub struct Client { - timeout: Option, disconnect: AtomicBool, message_dispatcher: MessageDispatcher, - task_group: TaskGroup, - send_chan: (Sender, Receiver), subscriptions: Arc, + send_chan: (Sender, Receiver), + task_group: TaskGroup, + config: ClientConfig, } #[derive(Serialize, Deserialize)] @@ -96,6 +119,11 @@ impl Client { Ok(()) } + /// Disconnect the client + pub async fn stop(&self) { + self.task_group.cancel().await; + } + async fn send_request( &self, method: &str, @@ -116,7 +144,7 @@ impl Client { let rx = self.message_dispatcher.register(id).await; // Wait for the message dispatcher to send the response - let result = match self.timeout { + let result = match self.config.timeout { Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?, None => rx.recv().await, }; @@ -152,11 +180,86 @@ impl Client { Ok(()) } + async fn init(config: ClientConfig) -> Result> { + let client = Arc::new(Client { + disconnect: AtomicBool::new(false), + subscriptions: Subscriptions::new(config.subscription_buffer_size), + send_chan: async_channel::bounded(10), + message_dispatcher: MessageDispatcher::new(), + task_group: TaskGroup::new(), + config, + }); + + let conn = client.connect().await?; + client.start_background_loop(conn); + Ok(client) + } + + async fn connect(self: &Arc) -> Result> { + let endpoint = self.config.endpoint.clone(); + let conn: Conn = match endpoint { + #[cfg(feature = "tcp")] + Endpoint::Tcp(..) => Box::new( + karyon_net::tcp::dial(&endpoint, self.config.tcp_config.clone(), JsonCodec {}) + .await?, + ), + #[cfg(feature = "tls")] + Endpoint::Tls(..) => match &self.config.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.config.endpoint, + ClientTlsConfig { + dns_name: dns_name.to_string(), + client_config: conf.clone(), + tcp_config: self.config.tcp_config.clone(), + }, + JsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) => { + let config = ClientWsConfig { + tcp_config: self.config.tcp_config.clone(), + wss_config: None, + }; + Box::new(karyon_net::ws::dial(&endpoint, config, WsJsonCodec {}).await?) + } + #[cfg(all(feature = "ws", feature = "tls"))] + Endpoint::Wss(..) => match &self.config.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &endpoint, + ClientWsConfig { + tcp_config: self.config.tcp_config.clone(), + wss_config: Some(ClientWssConfig { + dns_name: dns_name.clone(), + client_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => return Err(Error::TLSConfigRequired), + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => { + Box::new(karyon_net::unix::dial(&endpoint, Default::default(), JsonCodec {}).await?) + } + _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())), + }; + + Ok(conn) + } + fn start_background_loop(self: &Arc, conn: Conn) { let selfc = self.clone(); let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { - error!("Background loop stopped: {err}"); + error!("Client stopped: {err}"); } selfc.disconnect.store(true, Ordering::Relaxed); selfc.subscriptions.clear().await; @@ -201,8 +304,8 @@ impl Client { self.subscriptions.notify(nt).await } }, - Err(_) => { - error!("Receive unexpected msg: {msg}"); + Err(err) => { + error!("Receive unexpected msg {msg}: {err}"); Err(Error::InvalidMsg("Unexpected msg")) } } -- cgit v1.2.3