aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/client.rs')
-rw-r--r--jsonrpc/src/client.rs155
1 files changed, 117 insertions, 38 deletions
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
index efbaf50..50d772b 100644
--- a/jsonrpc/src/client.rs
+++ b/jsonrpc/src/client.rs
@@ -1,37 +1,32 @@
+use std::time::Duration;
+
use log::debug;
use serde::{de::DeserializeOwned, Serialize};
-use karyon_core::util::random_32;
-use karyon_net::ToConn;
+#[cfg(feature = "smol")]
+use futures_rustls::rustls;
+#[cfg(feature = "tokio")]
+use tokio_rustls::rustls;
-use crate::{
- codec::{Codec, CodecConfig},
- message, Error, Result, JSONRPC_VERSION,
+use karyon_core::{async_util::timeout, util::random_32};
+use karyon_net::{
+ tls::ClientTlsConfig,
+ ws::{ClientWsConfig, ClientWssConfig},
+ Conn, Endpoint, ToEndpoint,
};
-/// Represents client config
-#[derive(Default)]
-pub struct ClientConfig {
- pub timeout: Option<u64>,
-}
+use crate::{
+ codec::{JsonCodec, WsJsonCodec},
+ message, Error, Result,
+};
/// Represents an RPC client
pub struct Client {
- codec: Codec,
- config: ClientConfig,
+ conn: Conn<serde_json::Value>,
+ timeout: Option<u64>,
}
impl Client {
- /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection.
- pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self {
- let codec_config = CodecConfig {
- max_allowed_buffer_size: 0,
- ..Default::default()
- };
- let codec = Codec::new(conn.to_conn(), codec_config);
- Self { codec, config }
- }
-
/// Calls the provided method, waits for the response, and returns the result.
pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
&self,
@@ -41,38 +36,122 @@ impl Client {
let id = serde_json::json!(random_32());
let request = message::Request {
- jsonrpc: JSONRPC_VERSION.to_string(),
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
id,
method: method.to_string(),
params: serde_json::json!(params),
};
- let mut payload = serde_json::to_vec(&request)?;
- payload.push(b'\n');
- self.codec.write_all(&payload).await?;
+ let req_json = serde_json::to_value(&request)?;
+ match self.timeout {
+ Some(s) => {
+ let dur = Duration::from_secs(s);
+ timeout(dur, self.conn.send(req_json)).await??;
+ }
+ None => {
+ self.conn.send(req_json).await?;
+ }
+ }
debug!("--> {request}");
- let mut buffer = vec![];
- if let Some(t) = self.config.timeout {
- self.codec.read_until_with_timeout(&mut buffer, t).await?;
- } else {
- self.codec.read_until(&mut buffer).await?;
- };
-
- let response = serde_json::from_slice::<message::Response>(&buffer)?;
+ let msg = self.conn.recv().await?;
+ let response = serde_json::from_value::<message::Response>(msg)?;
debug!("<-- {response}");
- if let Some(error) = response.error {
- return Err(Error::CallError(error.code, error.message));
- }
-
if response.id.is_none() || response.id.unwrap() != request.id {
return Err(Error::InvalidMsg("Invalid response id"));
}
+ if let Some(error) = response.error {
+ return Err(Error::CallError(error.code, error.message));
+ }
+
match response.result {
Some(result) => Ok(serde_json::from_value::<V>(result)?),
None => Err(Error::InvalidMsg("Invalid response result")),
}
}
}
+
+pub struct ClientBuilder {
+ endpoint: Endpoint,
+ tls_config: Option<(rustls::ClientConfig, String)>,
+ timeout: Option<u64>,
+}
+
+impl ClientBuilder {
+ pub fn with_timeout(mut self, timeout: u64) -> Self {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tls_config = Some((config, dns_name.to_string()));
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ pub async fn build(self) -> Result<Client> {
+ let conn: Conn<serde_json::Value> = match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::tls::dial(
+ &self.endpoint,
+ ClientTlsConfig {
+ dns_name,
+ client_config: conf,
+ tcp_config: Default::default(),
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
+ ),
+ },
+ Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::ws::dial(
+ &self.endpoint,
+ ClientWsConfig {
+ tcp_config: Default::default(),
+ wss_config: Some(ClientWssConfig {
+ dns_name,
+ client_config: conf,
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {})
+ .await?,
+ ),
+ },
+ Endpoint::Unix(..) => Box::new(
+ karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
+ ),
+ _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ };
+ Ok(Client {
+ timeout: self.timeout,
+ conn,
+ })
+ }
+}
+impl Client {
+ pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
+ let endpoint = endpoint.to_endpoint()?;
+ Ok(ClientBuilder {
+ endpoint,
+ timeout: None,
+ tls_config: None,
+ })
+ }
+}