aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-22 15:33:24 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-22 15:33:24 +0200
commit6c793e7ed3f3736e2169976f11e304f288ca6813 (patch)
tree5ea269f9b0a3148e6fc206ef166900309a0d43dc /jsonrpc/src/client/mod.rs
parent0a2c0dbc6c1afd56e9db0d93eef1ae05fe81a30b (diff)
jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in
`ServerBuilder` and `ClientBuilder`
Diffstat (limited to 'jsonrpc/src/client/mod.rs')
-rw-r--r--jsonrpc/src/client/mod.rs119
1 files changed, 111 insertions, 8 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 80125b1..51f0233 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -15,13 +15,26 @@ use log::{debug, error};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
+#[cfg(feature = "tcp")]
+use karyon_net::tcp::TcpConfig;
+#[cfg(feature = "ws")]
+use karyon_net::ws::ClientWsConfig;
+#[cfg(all(feature = "ws", feature = "tls"))]
+use karyon_net::ws::ClientWssConfig;
+#[cfg(feature = "tls")]
+use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig};
+use karyon_net::{Conn, Endpoint};
+
use karyon_core::{
async_util::{select, timeout, Either, TaskGroup, TaskResult},
util::random_32,
};
-use karyon_net::Conn;
+
+#[cfg(feature = "ws")]
+use crate::codec::WsJsonCodec;
use crate::{
+ codec::JsonCodec,
message::{self, SubscriptionID},
Error, Result,
};
@@ -32,14 +45,24 @@ use subscriptions::Subscriptions;
type RequestID = u32;
+struct ClientConfig {
+ endpoint: Endpoint,
+ #[cfg(feature = "tcp")]
+ tcp_config: TcpConfig,
+ #[cfg(feature = "tls")]
+ tls_config: Option<(rustls::ClientConfig, String)>,
+ timeout: Option<u64>,
+ subscription_buffer_size: usize,
+}
+
/// Represents an RPC client
pub struct Client {
- timeout: Option<u64>,
disconnect: AtomicBool,
message_dispatcher: MessageDispatcher,
- task_group: TaskGroup,
- send_chan: (Sender<serde_json::Value>, Receiver<serde_json::Value>),
subscriptions: Arc<Subscriptions>,
+ send_chan: (Sender<serde_json::Value>, Receiver<serde_json::Value>),
+ task_group: TaskGroup,
+ config: ClientConfig,
}
#[derive(Serialize, Deserialize)]
@@ -96,6 +119,11 @@ impl Client {
Ok(())
}
+ /// Disconnect the client
+ pub async fn stop(&self) {
+ self.task_group.cancel().await;
+ }
+
async fn send_request<T: Serialize + DeserializeOwned>(
&self,
method: &str,
@@ -116,7 +144,7 @@ impl Client {
let rx = self.message_dispatcher.register(id).await;
// Wait for the message dispatcher to send the response
- let result = match self.timeout {
+ let result = match self.config.timeout {
Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?,
None => rx.recv().await,
};
@@ -152,11 +180,86 @@ impl Client {
Ok(())
}
+ async fn init(config: ClientConfig) -> Result<Arc<Self>> {
+ let client = Arc::new(Client {
+ disconnect: AtomicBool::new(false),
+ subscriptions: Subscriptions::new(config.subscription_buffer_size),
+ send_chan: async_channel::bounded(10),
+ message_dispatcher: MessageDispatcher::new(),
+ task_group: TaskGroup::new(),
+ config,
+ });
+
+ let conn = client.connect().await?;
+ client.start_background_loop(conn);
+ Ok(client)
+ }
+
+ async fn connect(self: &Arc<Self>) -> Result<Conn<serde_json::Value>> {
+ let endpoint = self.config.endpoint.clone();
+ let conn: Conn<serde_json::Value> = match endpoint {
+ #[cfg(feature = "tcp")]
+ Endpoint::Tcp(..) => Box::new(
+ karyon_net::tcp::dial(&endpoint, self.config.tcp_config.clone(), JsonCodec {})
+ .await?,
+ ),
+ #[cfg(feature = "tls")]
+ Endpoint::Tls(..) => match &self.config.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::tls::dial(
+ &self.config.endpoint,
+ ClientTlsConfig {
+ dns_name: dns_name.to_string(),
+ client_config: conf.clone(),
+ tcp_config: self.config.tcp_config.clone(),
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => return Err(Error::TLSConfigRequired),
+ },
+ #[cfg(feature = "ws")]
+ Endpoint::Ws(..) => {
+ let config = ClientWsConfig {
+ tcp_config: self.config.tcp_config.clone(),
+ wss_config: None,
+ };
+ Box::new(karyon_net::ws::dial(&endpoint, config, WsJsonCodec {}).await?)
+ }
+ #[cfg(all(feature = "ws", feature = "tls"))]
+ Endpoint::Wss(..) => match &self.config.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::ws::dial(
+ &endpoint,
+ ClientWsConfig {
+ tcp_config: self.config.tcp_config.clone(),
+ wss_config: Some(ClientWssConfig {
+ dns_name: dns_name.clone(),
+ client_config: conf.clone(),
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => return Err(Error::TLSConfigRequired),
+ },
+ #[cfg(all(feature = "unix", target_family = "unix"))]
+ Endpoint::Unix(..) => {
+ Box::new(karyon_net::unix::dial(&endpoint, Default::default(), JsonCodec {}).await?)
+ }
+ _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())),
+ };
+
+ Ok(conn)
+ }
+
fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) {
let selfc = self.clone();
let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
- error!("Background loop stopped: {err}");
+ error!("Client stopped: {err}");
}
selfc.disconnect.store(true, Ordering::Relaxed);
selfc.subscriptions.clear().await;
@@ -201,8 +304,8 @@ impl Client {
self.subscriptions.notify(nt).await
}
},
- Err(_) => {
- error!("Receive unexpected msg: {msg}");
+ Err(err) => {
+ error!("Receive unexpected msg {msg}: {err}");
Err(Error::InvalidMsg("Unexpected msg"))
}
}