aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
commitd1c816660c0583db33d160e2ef3e980bef0d5a85 (patch)
tree9eb06e6dbfbe34c6c2f85eee8d2e337b155be103 /jsonrpc/src/client
parent385d53ec53e750e342cce78edb793958edf5133e (diff)
p2p: WIP rpc server implementation for the p2p monitor
Diffstat (limited to 'jsonrpc/src/client')
-rw-r--r--jsonrpc/src/client/mod.rs27
1 files changed, 9 insertions, 18 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 5e3a24f..b614c95 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -26,7 +26,7 @@ use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig}
const CHANNEL_CAP: usize = 10;
-const DEFAULT_TIMEOUT: u64 = 1000; // 1s
+const DEFAULT_TIMEOUT: u64 = 3000; // 3s
/// Type alias for a subscription to receive notifications.
///
@@ -52,13 +52,11 @@ impl Client {
params: T,
) -> Result<V> {
let request = self.send_request(method, params).await?;
- debug!("--> {request}");
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::CallError(error.code, error.message));
@@ -85,13 +83,11 @@ impl Client {
params: T,
) -> Result<(SubscriptionID, Subscription)> {
let request = self.send_request(method, params).await?;
- debug!("--> {request}");
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::SubscribeError(error.code, error.message));
@@ -117,14 +113,12 @@ impl Client {
/// This function sends an unsubscription request for the specified method
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
- let request = self.send_request(method, json!(sub_id)).await?;
- debug!("--> {request}");
+ let request = self.send_request(method, sub_id).await?;
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::SubscribeError(error.code, error.message));
@@ -144,12 +138,11 @@ impl Client {
params: T,
) -> Result<message::Request> {
let id = random_64();
-
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
id: json!(id),
method: method.to_string(),
- params: json!(params),
+ params: Some(json!(params)),
};
let req_json = serde_json::to_value(&request)?;
@@ -164,6 +157,7 @@ impl Client {
}
}
+ debug!("--> {request}");
Ok(request)
}
@@ -182,19 +176,16 @@ impl Client {
loop {
let msg = selfc.conn.recv().await?;
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
+ debug!("<-- {res}");
selfc.chan_tx.send(res).await?;
-
continue;
}
if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
+ debug!("<-- {nt}");
let sub_result: message::NotificationResult = match nt.params {
- Some(p) => serde_json::from_value(p)?,
- None => {
- return Err(Error::InvalidMsg(
- "Invalid notification msg: subscription id not found",
- ))
- }
+ Some(ref p) => serde_json::from_value(p.clone())?,
+ None => return Err(Error::InvalidMsg("Invalid notification msg")),
};
match selfc
@@ -232,7 +223,7 @@ pub struct ClientBuilder {
}
impl ClientBuilder {
- /// Set timeout for requests, in milliseconds.
+ /// Set timeout for sending and receiving messages, in milliseconds.
///
/// # Examples
///