diff options
author | hozan23 <hozan23@proton.me> | 2023-11-20 23:15:10 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-20 23:15:10 +0300 |
commit | 598f9e2d47da80f2bec2ead9c2fe215eff157936 (patch) | |
tree | 121f1a391a01d3b7856d948aaaa3c154317f0cd6 /jsonrpc/src/client.rs | |
parent | 2ee34b432e7652a34ee64a706b5ebc1bce867dce (diff) |
jsonrpc: add Codec struct for reading from and writing to the connection
Diffstat (limited to 'jsonrpc/src/client.rs')
-rw-r--r-- | jsonrpc/src/client.rs | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index 2863204..d5caebe 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -1,31 +1,46 @@ use log::debug; use serde::{de::DeserializeOwned, Serialize}; -use karyons_core::{async_utils::timeout, utils::random_32}; +use karyons_core::utils::random_32; use karyons_net::{dial, Conn, Endpoint}; use crate::{ - message, - utils::{read_until, write_all}, - Error, Result, JSONRPC_VERSION, + codec::{Codec, CodecConfig}, + message, Error, Result, JSONRPC_VERSION, }; +/// Represents client config +#[derive(Default)] +pub struct ClientConfig { + pub timeout: Option<u64>, +} + /// Represents an RPC client pub struct Client { - conn: Conn, - timeout: Option<u64>, + codec: Codec, + config: ClientConfig, } impl Client { /// Creates a new RPC client. - pub fn new(conn: Conn, timeout: Option<u64>) -> Self { - Self { conn, timeout } + pub fn new(conn: Conn, config: ClientConfig) -> Self { + let codec_config = CodecConfig { + max_allowed_msg_size: 0, + ..Default::default() + }; + let codec = Codec::new(conn, codec_config); + Self { codec, config } } /// Creates a new RPC client using the provided endpoint. - pub async fn new_with_endpoint(endpoint: &Endpoint, timeout: Option<u64>) -> Result<Self> { + pub async fn new_with_endpoint(endpoint: &Endpoint, config: ClientConfig) -> Result<Self> { let conn = dial(endpoint).await?; - Ok(Self { conn, timeout }) + let codec_config = CodecConfig { + max_allowed_msg_size: 0, + ..Default::default() + }; + let codec = Codec::new(conn, codec_config); + Ok(Self { codec, config }) } /// Calls the named method, waits for the response, and returns the result. @@ -44,19 +59,15 @@ impl Client { }; let payload = serde_json::to_vec(&request)?; - write_all(&self.conn, &payload).await?; + self.codec.write_all(&payload).await?; debug!("--> {request}"); let mut buffer = vec![]; - if let Some(t) = self.timeout { - timeout( - std::time::Duration::from_secs(t), - read_until(&self.conn, &mut buffer), - ) - .await? + if let Some(t) = self.config.timeout { + self.codec.read_until_timeout(&mut buffer, t).await?; } else { - read_until(&self.conn, &mut buffer).await - }?; + self.codec.read_until(&mut buffer).await?; + }; let response = serde_json::from_slice::<message::Response>(&buffer)?; debug!("<-- {response}"); |