From d1c816660c0583db33d160e2ef3e980bef0d5a85 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 27 May 2024 00:59:23 +0200 Subject: p2p: WIP rpc server implementation for the p2p monitor --- jsonrpc/src/client/mod.rs | 27 +++++++++------------------ jsonrpc/src/error.rs | 6 +++--- jsonrpc/src/message.rs | 5 +++-- jsonrpc/src/server/mod.rs | 9 ++++++--- 4 files changed, 21 insertions(+), 26 deletions(-) (limited to 'jsonrpc') diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 5e3a24f..b614c95 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -26,7 +26,7 @@ use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig} const CHANNEL_CAP: usize = 10; -const DEFAULT_TIMEOUT: u64 = 1000; // 1s +const DEFAULT_TIMEOUT: u64 = 3000; // 3s /// Type alias for a subscription to receive notifications. /// @@ -52,13 +52,11 @@ impl Client { params: T, ) -> Result { let request = self.send_request(method, params).await?; - debug!("--> {request}"); let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::CallError(error.code, error.message)); @@ -85,13 +83,11 @@ impl Client { params: T, ) -> Result<(SubscriptionID, Subscription)> { let request = self.send_request(method, params).await?; - debug!("--> {request}"); let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::SubscribeError(error.code, error.message)); @@ -117,14 +113,12 @@ impl Client { /// This function sends an unsubscription request for the specified method /// and subscription ID. It waits for the response to confirm the unsubscription. pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { - let request = self.send_request(method, json!(sub_id)).await?; - debug!("--> {request}"); + let request = self.send_request(method, sub_id).await?; let response = match self.timeout { Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, None => self.chan_rx.recv().await?, }; - debug!("<-- {response}"); if let Some(error) = response.error { return Err(Error::SubscribeError(error.code, error.message)); @@ -144,12 +138,11 @@ impl Client { params: T, ) -> Result { let id = random_64(); - let request = message::Request { jsonrpc: message::JSONRPC_VERSION.to_string(), id: json!(id), method: method.to_string(), - params: json!(params), + params: Some(json!(params)), }; let req_json = serde_json::to_value(&request)?; @@ -164,6 +157,7 @@ impl Client { } } + debug!("--> {request}"); Ok(request) } @@ -182,19 +176,16 @@ impl Client { loop { let msg = selfc.conn.recv().await?; if let Ok(res) = serde_json::from_value::(msg.clone()) { + debug!("<-- {res}"); selfc.chan_tx.send(res).await?; - continue; } if let Ok(nt) = serde_json::from_value::(msg.clone()) { + debug!("<-- {nt}"); let sub_result: message::NotificationResult = match nt.params { - Some(p) => serde_json::from_value(p)?, - None => { - return Err(Error::InvalidMsg( - "Invalid notification msg: subscription id not found", - )) - } + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), }; match selfc @@ -232,7 +223,7 @@ pub struct ClientBuilder { } impl ClientBuilder { - /// Set timeout for requests, in milliseconds. + /// Set timeout for sending and receiving messages, in milliseconds. /// /// # Examples /// diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index e1cb071..3994bcf 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -41,8 +41,8 @@ pub enum Error { #[error(transparent)] ChannelRecv(#[from] async_channel::RecvError), - #[error("Channel broadcast Error: {0}")] - ChannelBroadcast(String), + #[error("Channel send Error: {0}")] + ChannelSend(String), #[error("Unexpected Error: {0}")] General(&'static str), @@ -56,6 +56,6 @@ pub enum Error { impl From> for Error { fn from(error: async_channel::SendError) -> Self { - Error::ChannelBroadcast(error.to_string()) + Error::ChannelSend(error.to_string()) } } diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 55f8314..34d6235 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -24,9 +24,10 @@ pub const INTERNAL_ERROR_CODE: i32 = -32603; #[derive(Debug, Serialize, Deserialize)] pub struct Request { pub jsonrpc: String, - pub method: String, - pub params: serde_json::Value, pub id: serde_json::Value, + pub method: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, } #[derive(Debug, Serialize, Deserialize)] diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7e9e969..e1805e1 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -4,7 +4,7 @@ pub mod service; use std::{collections::HashMap, sync::Arc}; -use log::{debug, error, warn}; +use log::{debug, error, trace, warn}; #[cfg(feature = "smol")] use futures_rustls::rustls; @@ -212,6 +212,7 @@ impl Server { channel: ArcChannel, msg: serde_json::Value, ) { + trace!("--> new request {msg}"); let on_failure = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Failed to handle a request: {err}"); @@ -250,7 +251,8 @@ impl Server { if let Some(service) = self.pubsub_services.get(&req.srvc_name) { if let Some(method) = service.get_pubsub_method(&req.method_name) { let name = format!("{}.{}", service.name(), req.method_name); - response.result = match method(channel, name, req.msg.params.clone()).await { + let params = req.msg.params.unwrap_or(serde_json::json!(())); + response.result = match method(channel, name, params).await { Ok(res) => Some(res), Err(err) => return self.handle_error(err, req.msg.id), }; @@ -261,7 +263,8 @@ impl Server { if let Some(service) = self.services.get(&req.srvc_name) { if let Some(method) = service.get_method(&req.method_name) { - response.result = match method(req.msg.params.clone()).await { + let params = req.msg.params.unwrap_or(serde_json::json!(())); + response.result = match method(params).await { Ok(res) => Some(res), Err(err) => return self.handle_error(err, req.msg.id), }; -- cgit v1.2.3