From d1c816660c0583db33d160e2ef3e980bef0d5a85 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@karyontech.net>
Date: Mon, 27 May 2024 00:59:23 +0200
Subject: p2p: WIP rpc server implementation for the p2p monitor

---
 jsonrpc/src/client/mod.rs | 27 +++++++++------------------
 jsonrpc/src/error.rs      |  6 +++---
 jsonrpc/src/message.rs    |  5 +++--
 jsonrpc/src/server/mod.rs |  9 ++++++---
 4 files changed, 21 insertions(+), 26 deletions(-)

(limited to 'jsonrpc/src')

diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 5e3a24f..b614c95 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -26,7 +26,7 @@ use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig}
 
 const CHANNEL_CAP: usize = 10;
 
-const DEFAULT_TIMEOUT: u64 = 1000; // 1s
+const DEFAULT_TIMEOUT: u64 = 3000; // 3s
 
 /// Type alias for a subscription to receive notifications.
 ///
@@ -52,13 +52,11 @@ impl Client {
         params: T,
     ) -> Result<V> {
         let request = self.send_request(method, params).await?;
-        debug!("--> {request}");
 
         let response = match self.timeout {
             Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
             None => self.chan_rx.recv().await?,
         };
-        debug!("<-- {response}");
 
         if let Some(error) = response.error {
             return Err(Error::CallError(error.code, error.message));
@@ -85,13 +83,11 @@ impl Client {
         params: T,
     ) -> Result<(SubscriptionID, Subscription)> {
         let request = self.send_request(method, params).await?;
-        debug!("--> {request}");
 
         let response = match self.timeout {
             Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
             None => self.chan_rx.recv().await?,
         };
-        debug!("<-- {response}");
 
         if let Some(error) = response.error {
             return Err(Error::SubscribeError(error.code, error.message));
@@ -117,14 +113,12 @@ impl Client {
     /// This function sends an unsubscription request for the specified method
     /// and subscription ID. It waits for the response to confirm the unsubscription.
     pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
-        let request = self.send_request(method, json!(sub_id)).await?;
-        debug!("--> {request}");
+        let request = self.send_request(method, sub_id).await?;
 
         let response = match self.timeout {
             Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
             None => self.chan_rx.recv().await?,
         };
-        debug!("<-- {response}");
 
         if let Some(error) = response.error {
             return Err(Error::SubscribeError(error.code, error.message));
@@ -144,12 +138,11 @@ impl Client {
         params: T,
     ) -> Result<message::Request> {
         let id = random_64();
-
         let request = message::Request {
             jsonrpc: message::JSONRPC_VERSION.to_string(),
             id: json!(id),
             method: method.to_string(),
-            params: json!(params),
+            params: Some(json!(params)),
         };
 
         let req_json = serde_json::to_value(&request)?;
@@ -164,6 +157,7 @@ impl Client {
             }
         }
 
+        debug!("--> {request}");
         Ok(request)
     }
 
@@ -182,19 +176,16 @@ impl Client {
                 loop {
                     let msg = selfc.conn.recv().await?;
                     if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
+                        debug!("<-- {res}");
                         selfc.chan_tx.send(res).await?;
-
                         continue;
                     }
 
                     if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
+                        debug!("<-- {nt}");
                         let sub_result: message::NotificationResult = match nt.params {
-                            Some(p) => serde_json::from_value(p)?,
-                            None => {
-                                return Err(Error::InvalidMsg(
-                                    "Invalid notification msg: subscription id not found",
-                                ))
-                            }
+                            Some(ref p) => serde_json::from_value(p.clone())?,
+                            None => return Err(Error::InvalidMsg("Invalid notification msg")),
                         };
 
                         match selfc
@@ -232,7 +223,7 @@ pub struct ClientBuilder {
 }
 
 impl ClientBuilder {
-    /// Set timeout for requests, in milliseconds.
+    /// Set timeout for sending and receiving messages, in milliseconds.
     ///
     /// # Examples
     ///
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index e1cb071..3994bcf 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -41,8 +41,8 @@ pub enum Error {
     #[error(transparent)]
     ChannelRecv(#[from] async_channel::RecvError),
 
-    #[error("Channel broadcast Error: {0}")]
-    ChannelBroadcast(String),
+    #[error("Channel send  Error: {0}")]
+    ChannelSend(String),
 
     #[error("Unexpected Error: {0}")]
     General(&'static str),
@@ -56,6 +56,6 @@ pub enum Error {
 
 impl<T> From<async_channel::SendError<T>> for Error {
     fn from(error: async_channel::SendError<T>) -> Self {
-        Error::ChannelBroadcast(error.to_string())
+        Error::ChannelSend(error.to_string())
     }
 }
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 55f8314..34d6235 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -24,9 +24,10 @@ pub const INTERNAL_ERROR_CODE: i32 = -32603;
 #[derive(Debug, Serialize, Deserialize)]
 pub struct Request {
     pub jsonrpc: String,
-    pub method: String,
-    pub params: serde_json::Value,
     pub id: serde_json::Value,
+    pub method: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub params: Option<serde_json::Value>,
 }
 
 #[derive(Debug, Serialize, Deserialize)]
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7e9e969..e1805e1 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -4,7 +4,7 @@ pub mod service;
 
 use std::{collections::HashMap, sync::Arc};
 
-use log::{debug, error, warn};
+use log::{debug, error, trace, warn};
 
 #[cfg(feature = "smol")]
 use futures_rustls::rustls;
@@ -212,6 +212,7 @@ impl Server {
         channel: ArcChannel,
         msg: serde_json::Value,
     ) {
+        trace!("--> new request {msg}");
         let on_failure = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 error!("Failed to handle a request: {err}");
@@ -250,7 +251,8 @@ impl Server {
         if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
             if let Some(method) = service.get_pubsub_method(&req.method_name) {
                 let name = format!("{}.{}", service.name(), req.method_name);
-                response.result = match method(channel, name, req.msg.params.clone()).await {
+                let params = req.msg.params.unwrap_or(serde_json::json!(()));
+                response.result = match method(channel, name, params).await {
                     Ok(res) => Some(res),
                     Err(err) => return self.handle_error(err, req.msg.id),
                 };
@@ -261,7 +263,8 @@ impl Server {
 
         if let Some(service) = self.services.get(&req.srvc_name) {
             if let Some(method) = service.get_method(&req.method_name) {
-                response.result = match method(req.msg.params.clone()).await {
+                let params = req.msg.params.unwrap_or(serde_json::json!(()));
+                response.result = match method(params).await {
                     Ok(res) => Some(res),
                     Err(err) => return self.handle_error(err, req.msg.id),
                 };
-- 
cgit v1.2.3