aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-20 23:15:10 +0300
committerhozan23 <hozan23@proton.me>2023-11-20 23:15:10 +0300
commit598f9e2d47da80f2bec2ead9c2fe215eff157936 (patch)
tree121f1a391a01d3b7856d948aaaa3c154317f0cd6 /jsonrpc/src/client.rs
parent2ee34b432e7652a34ee64a706b5ebc1bce867dce (diff)
jsonrpc: add Codec struct for reading from and writing to the connection
Diffstat (limited to 'jsonrpc/src/client.rs')
-rw-r--r--jsonrpc/src/client.rs49
1 files changed, 30 insertions, 19 deletions
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
index 2863204..d5caebe 100644
--- a/jsonrpc/src/client.rs
+++ b/jsonrpc/src/client.rs
@@ -1,31 +1,46 @@
use log::debug;
use serde::{de::DeserializeOwned, Serialize};
-use karyons_core::{async_utils::timeout, utils::random_32};
+use karyons_core::utils::random_32;
use karyons_net::{dial, Conn, Endpoint};
use crate::{
- message,
- utils::{read_until, write_all},
- Error, Result, JSONRPC_VERSION,
+ codec::{Codec, CodecConfig},
+ message, Error, Result, JSONRPC_VERSION,
};
+/// Represents client config
+#[derive(Default)]
+pub struct ClientConfig {
+ pub timeout: Option<u64>,
+}
+
/// Represents an RPC client
pub struct Client {
- conn: Conn,
- timeout: Option<u64>,
+ codec: Codec,
+ config: ClientConfig,
}
impl Client {
/// Creates a new RPC client.
- pub fn new(conn: Conn, timeout: Option<u64>) -> Self {
- Self { conn, timeout }
+ pub fn new(conn: Conn, config: ClientConfig) -> Self {
+ let codec_config = CodecConfig {
+ max_allowed_msg_size: 0,
+ ..Default::default()
+ };
+ let codec = Codec::new(conn, codec_config);
+ Self { codec, config }
}
/// Creates a new RPC client using the provided endpoint.
- pub async fn new_with_endpoint(endpoint: &Endpoint, timeout: Option<u64>) -> Result<Self> {
+ pub async fn new_with_endpoint(endpoint: &Endpoint, config: ClientConfig) -> Result<Self> {
let conn = dial(endpoint).await?;
- Ok(Self { conn, timeout })
+ let codec_config = CodecConfig {
+ max_allowed_msg_size: 0,
+ ..Default::default()
+ };
+ let codec = Codec::new(conn, codec_config);
+ Ok(Self { codec, config })
}
/// Calls the named method, waits for the response, and returns the result.
@@ -44,19 +59,15 @@ impl Client {
};
let payload = serde_json::to_vec(&request)?;
- write_all(&self.conn, &payload).await?;
+ self.codec.write_all(&payload).await?;
debug!("--> {request}");
let mut buffer = vec![];
- if let Some(t) = self.timeout {
- timeout(
- std::time::Duration::from_secs(t),
- read_until(&self.conn, &mut buffer),
- )
- .await?
+ if let Some(t) = self.config.timeout {
+ self.codec.read_until_timeout(&mut buffer, t).await?;
} else {
- read_until(&self.conn, &mut buffer).await
- }?;
+ self.codec.read_until(&mut buffer).await?;
+ };
let response = serde_json::from_slice::<message::Response>(&buffer)?;
debug!("<-- {response}");