aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
commitd1c816660c0583db33d160e2ef3e980bef0d5a85 (patch)
tree9eb06e6dbfbe34c6c2f85eee8d2e337b155be103 /jsonrpc/src
parent385d53ec53e750e342cce78edb793958edf5133e (diff)
p2p: WIP rpc server implementation for the p2p monitor
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/client/mod.rs27
-rw-r--r--jsonrpc/src/error.rs6
-rw-r--r--jsonrpc/src/message.rs5
-rw-r--r--jsonrpc/src/server/mod.rs9
4 files changed, 21 insertions, 26 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 5e3a24f..b614c95 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -26,7 +26,7 @@ use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig}
const CHANNEL_CAP: usize = 10;
-const DEFAULT_TIMEOUT: u64 = 1000; // 1s
+const DEFAULT_TIMEOUT: u64 = 3000; // 3s
/// Type alias for a subscription to receive notifications.
///
@@ -52,13 +52,11 @@ impl Client {
params: T,
) -> Result<V> {
let request = self.send_request(method, params).await?;
- debug!("--> {request}");
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::CallError(error.code, error.message));
@@ -85,13 +83,11 @@ impl Client {
params: T,
) -> Result<(SubscriptionID, Subscription)> {
let request = self.send_request(method, params).await?;
- debug!("--> {request}");
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::SubscribeError(error.code, error.message));
@@ -117,14 +113,12 @@ impl Client {
/// This function sends an unsubscription request for the specified method
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
- let request = self.send_request(method, json!(sub_id)).await?;
- debug!("--> {request}");
+ let request = self.send_request(method, sub_id).await?;
let response = match self.timeout {
Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
None => self.chan_rx.recv().await?,
};
- debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::SubscribeError(error.code, error.message));
@@ -144,12 +138,11 @@ impl Client {
params: T,
) -> Result<message::Request> {
let id = random_64();
-
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
id: json!(id),
method: method.to_string(),
- params: json!(params),
+ params: Some(json!(params)),
};
let req_json = serde_json::to_value(&request)?;
@@ -164,6 +157,7 @@ impl Client {
}
}
+ debug!("--> {request}");
Ok(request)
}
@@ -182,19 +176,16 @@ impl Client {
loop {
let msg = selfc.conn.recv().await?;
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
+ debug!("<-- {res}");
selfc.chan_tx.send(res).await?;
-
continue;
}
if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
+ debug!("<-- {nt}");
let sub_result: message::NotificationResult = match nt.params {
- Some(p) => serde_json::from_value(p)?,
- None => {
- return Err(Error::InvalidMsg(
- "Invalid notification msg: subscription id not found",
- ))
- }
+ Some(ref p) => serde_json::from_value(p.clone())?,
+ None => return Err(Error::InvalidMsg("Invalid notification msg")),
};
match selfc
@@ -232,7 +223,7 @@ pub struct ClientBuilder {
}
impl ClientBuilder {
- /// Set timeout for requests, in milliseconds.
+ /// Set timeout for sending and receiving messages, in milliseconds.
///
/// # Examples
///
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index e1cb071..3994bcf 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -41,8 +41,8 @@ pub enum Error {
#[error(transparent)]
ChannelRecv(#[from] async_channel::RecvError),
- #[error("Channel broadcast Error: {0}")]
- ChannelBroadcast(String),
+ #[error("Channel send Error: {0}")]
+ ChannelSend(String),
#[error("Unexpected Error: {0}")]
General(&'static str),
@@ -56,6 +56,6 @@ pub enum Error {
impl<T> From<async_channel::SendError<T>> for Error {
fn from(error: async_channel::SendError<T>) -> Self {
- Error::ChannelBroadcast(error.to_string())
+ Error::ChannelSend(error.to_string())
}
}
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 55f8314..34d6235 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -24,9 +24,10 @@ pub const INTERNAL_ERROR_CODE: i32 = -32603;
#[derive(Debug, Serialize, Deserialize)]
pub struct Request {
pub jsonrpc: String,
- pub method: String,
- pub params: serde_json::Value,
pub id: serde_json::Value,
+ pub method: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize)]
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7e9e969..e1805e1 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -4,7 +4,7 @@ pub mod service;
use std::{collections::HashMap, sync::Arc};
-use log::{debug, error, warn};
+use log::{debug, error, trace, warn};
#[cfg(feature = "smol")]
use futures_rustls::rustls;
@@ -212,6 +212,7 @@ impl Server {
channel: ArcChannel,
msg: serde_json::Value,
) {
+ trace!("--> new request {msg}");
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Failed to handle a request: {err}");
@@ -250,7 +251,8 @@ impl Server {
if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
if let Some(method) = service.get_pubsub_method(&req.method_name) {
let name = format!("{}.{}", service.name(), req.method_name);
- response.result = match method(channel, name, req.msg.params.clone()).await {
+ let params = req.msg.params.unwrap_or(serde_json::json!(()));
+ response.result = match method(channel, name, params).await {
Ok(res) => Some(res),
Err(err) => return self.handle_error(err, req.msg.id),
};
@@ -261,7 +263,8 @@ impl Server {
if let Some(service) = self.services.get(&req.srvc_name) {
if let Some(method) = service.get_method(&req.method_name) {
- response.result = match method(req.msg.params.clone()).await {
+ let params = req.msg.params.unwrap_or(serde_json::json!(()));
+ response.result = match method(params).await {
Ok(res) => Some(res),
Err(err) => return self.handle_error(err, req.msg.id),
};