From d1c816660c0583db33d160e2ef3e980bef0d5a85 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 27 May 2024 00:59:23 +0200 Subject: p2p: WIP rpc server implementation for the p2p monitor --- jsonrpc/src/client/mod.rs | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) (limited to 'jsonrpc/src/client/mod.rs') diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 5e3a24f..b614c95 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -26,7 +26,7 @@ use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig} const CHANNEL_CAP: usize = 10; -const DEFAULT_TIMEOUT: u64 = 1000; // 1s +const DEFAULT_TIMEOUT: u64 = 3000; // 3s /// Type alias for a subscription to receive notifications. /// @@ -52,13 +52,11 @@ impl Client { params: T, ) -> Result { let request = self.send_request(method, params).await?; - debug!("--> {request}"); let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::CallError(error.code, error.message)); @@ -85,13 +83,11 @@ impl Client { params: T, ) -> Result<(SubscriptionID, Subscription)> { let request = self.send_request(method, params).await?; - debug!("--> {request}"); let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::SubscribeError(error.code, error.message)); @@ -117,14 +113,12 @@ impl Client { /// This function sends an unsubscription request for the specified method /// and subscription ID. It waits for the response to confirm the unsubscription. pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { - let request = self.send_request(method, json!(sub_id)).await?; - debug!("--> {request}"); + let request = self.send_request(method, sub_id).await?; let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::SubscribeError(error.code, error.message)); @@ -144,12 +138,11 @@ impl Client { params: T, ) -> Result { let id = random_64(); - let request = message::Request { jsonrpc: message::JSONRPC_VERSION.to_string(), id: json!(id), method: method.to_string(), - params: json!(params), + params: Some(json!(params)), }; let req_json = serde_json::to_value(&request)?; @@ -164,6 +157,7 @@ impl Client { } } + debug!("--> {request}"); Ok(request) } @@ -182,19 +176,16 @@ impl Client { loop { let msg = selfc.conn.recv().await?; if let Ok(res) = serde_json::from_value::(msg.clone()) { + debug!("<-- {res}"); selfc.chan_tx.send(res).await?; - continue; } if let Ok(nt) = serde_json::from_value::(msg.clone()) { + debug!("<-- {nt}"); let sub_result: message::NotificationResult = match nt.params { - Some(p) => serde_json::from_value(p)?, - None => { - return Err(Error::InvalidMsg( - "Invalid notification msg: subscription id not found", - )) - } + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), }; match selfc @@ -232,7 +223,7 @@ pub struct ClientBuilder { } impl ClientBuilder { - /// Set timeout for requests, in milliseconds. + /// Set timeout for sending and receiving messages, in milliseconds. /// /// # Examples /// -- cgit v1.2.3