aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jsonrpc/README.md14
-rw-r--r--jsonrpc/examples/pubsub_server.rs12
-rw-r--r--jsonrpc/examples/server.rs10
-rw-r--r--jsonrpc/examples/tokio_server/src/main.rs16
-rw-r--r--jsonrpc/src/error.rs44
-rw-r--r--jsonrpc/src/lib.rs2
-rw-r--r--jsonrpc/src/message.rs58
-rw-r--r--jsonrpc/src/server/mod.rs83
-rw-r--r--jsonrpc/src/server/pubsub_service.rs10
-rw-r--r--jsonrpc/src/server/service.rs10
-rw-r--r--jsonrpc/tests/impl_rpc_service.rs4
-rw-r--r--jsonrpc/tests/rpc_impl.rs4
-rw-r--r--p2p/examples/monitor/src/service.rs51
13 files changed, 181 insertions, 137 deletions
diff --git a/jsonrpc/README.md b/jsonrpc/README.md
index 8127727..4f016b4 100644
--- a/jsonrpc/README.md
+++ b/jsonrpc/README.md
@@ -31,7 +31,7 @@ use serde_json::Value;
use smol::stream::StreamExt;
use karyon_jsonrpc::{
- Error, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID,
+ RPCError, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID,
Channel
};
@@ -39,30 +39,30 @@ struct HelloWorld {}
#[rpc_impl]
impl HelloWorld {
- async fn say_hello(&self, params: Value) -> Result<Value, Error> {
+ async fn say_hello(&self, params: Value) -> Result<Value, RPCError> {
let msg: String = serde_json::from_value(params)?;
Ok(serde_json::json!(format!("Hello {msg}!")))
}
- async fn foo(&self, params: Value) -> Result<Value, Error> {
+ async fn foo(&self, params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!("foo!"))
}
- async fn bar(&self, params: Value) -> Result<Value, Error> {
+ async fn bar(&self, params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!("bar!"))
}
}
#[rpc_pubsub_impl]
impl HelloWorld {
- async fn log_subscribe(&self, chan: Arc<Channel>, method: String, _params: Value) -> Result<Value, Error> {
+ async fn log_subscribe(&self, chan: Arc<Channel>, method: String, _params: Value) -> Result<Value, RPCError> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id.clone();
smol::spawn(async move {
loop {
smol::Timer::after(std::time::Duration::from_secs(1)).await;
if let Err(err) = sub.notify(serde_json::json!("Hello")).await {
- println!("Error send notification {err}");
+ println!("Failed to send notification: {err}");
break;
}
}
@@ -72,7 +72,7 @@ impl HelloWorld {
Ok(serde_json::json!(sub_id))
}
- async fn log_unsubscribe(&self, chan: Arc<Channel>, method: String, params: Value) -> Result<Value, Error> {
+ async fn log_unsubscribe(&self, chan: Arc<Channel>, method: String, params: Value) -> Result<Value, RPCError> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs
index 40ea756..d1623c1 100644
--- a/jsonrpc/examples/pubsub_server.rs
+++ b/jsonrpc/examples/pubsub_server.rs
@@ -5,7 +5,9 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use karyon_core::async_util::sleep;
-use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server,
+};
struct Calc {}
@@ -20,7 +22,7 @@ struct Pong {}
#[rpc_impl]
impl Calc {
- async fn ping(&self, _params: Value) -> Result<Value, Error> {
+ async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!(Pong {}))
}
}
@@ -32,14 +34,14 @@ impl Calc {
chan: Arc<Channel>,
method: String,
_params: Value,
- ) -> Result<Value, Error> {
+ ) -> Result<Value, RPCError> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id.clone();
smol::spawn(async move {
loop {
sleep(Duration::from_millis(500)).await;
if let Err(err) = sub.notify(serde_json::json!("Hello")).await {
- error!("Error send notification {err}");
+ error!("Send notification {err}");
break;
}
}
@@ -54,7 +56,7 @@ impl Calc {
chan: Arc<Channel>,
_method: String,
params: Value,
- ) -> Result<Value, Error> {
+ ) -> Result<Value, RPCError> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs
index 31e65dd..acbe2a9 100644
--- a/jsonrpc/examples/server.rs
+++ b/jsonrpc/examples/server.rs
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use karyon_core::async_util::sleep;
-use karyon_jsonrpc::{rpc_impl, Error, Server};
+use karyon_jsonrpc::{rpc_impl, RPCError, Server};
struct Calc {
version: String,
@@ -21,21 +21,21 @@ struct Pong {}
#[rpc_impl]
impl Calc {
- async fn ping(&self, _params: Value) -> Result<Value, Error> {
+ async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!(Pong {}))
}
- async fn add(&self, params: Value) -> Result<Value, Error> {
+ async fn add(&self, params: Value) -> Result<Value, RPCError> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x + params.y))
}
- async fn sub(&self, params: Value) -> Result<Value, Error> {
+ async fn sub(&self, params: Value) -> Result<Value, RPCError> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x - params.y))
}
- async fn version(&self, _params: Value) -> Result<Value, Error> {
+ async fn version(&self, _params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!(self.version))
}
}
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index a9b2b32..5a8d604 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server,
+};
struct Calc {
version: String,
@@ -20,21 +22,21 @@ struct Pong {}
#[rpc_impl]
impl Calc {
- async fn ping(&self, _params: Value) -> Result<Value, Error> {
+ async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!(Pong {}))
}
- async fn add(&self, params: Value) -> Result<Value, Error> {
+ async fn add(&self, params: Value) -> Result<Value, RPCError> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x + params.y))
}
- async fn sub(&self, params: Value) -> Result<Value, Error> {
+ async fn sub(&self, params: Value) -> Result<Value, RPCError> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x - params.y))
}
- async fn version(&self, _params: Value) -> Result<Value, Error> {
+ async fn version(&self, _params: Value) -> Result<Value, RPCError> {
Ok(serde_json::json!(self.version))
}
}
@@ -46,7 +48,7 @@ impl Calc {
chan: Arc<Channel>,
method: String,
_params: Value,
- ) -> Result<Value, Error> {
+ ) -> Result<Value, RPCError> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
tokio::spawn(async move {
@@ -66,7 +68,7 @@ impl Calc {
chan: Arc<Channel>,
_method: String,
params: Value,
- ) -> Result<Value, Error> {
+ ) -> Result<Value, RPCError> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index 3994bcf..89d0e2f 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -14,21 +14,12 @@ pub enum Error {
#[error("Subscribe Error: code: {0} msg: {1}")]
SubscribeError(i32, String),
- #[error("RPC Method Error: code: {0} msg: {1}")]
- RPCMethodError(i32, &'static str),
-
- #[error("Invalid Params: {0}")]
- InvalidParams(&'static str),
-
- #[error("Invalid Request: {0}")]
- InvalidRequest(&'static str),
+ #[error("Invalid Message Error: {0}")]
+ InvalidMsg(&'static str),
#[error(transparent)]
ParseJSON(#[from] serde_json::Error),
- #[error("Invalid Message Error: {0}")]
- InvalidMsg(&'static str),
-
#[error("Unsupported protocol: {0}")]
UnsupportedProtocol(String),
@@ -42,7 +33,7 @@ pub enum Error {
ChannelRecv(#[from] async_channel::RecvError),
#[error("Channel send Error: {0}")]
- ChannelSend(String),
+ ChannelSend(&'static str),
#[error("Unexpected Error: {0}")]
General(&'static str),
@@ -56,6 +47,33 @@ pub enum Error {
impl<T> From<async_channel::SendError<T>> for Error {
fn from(error: async_channel::SendError<T>) -> Self {
- Error::ChannelSend(error.to_string())
+ Error::ChannelSend(error.to_string().leak())
+ }
+}
+
+pub type RPCResult<T> = std::result::Result<T, RPCError>;
+
+/// Represents RPC Error.
+#[derive(ThisError, Debug)]
+pub enum RPCError {
+ #[error("Custom Error: code: {0} msg: {1}")]
+ CustomError(i32, &'static str),
+
+ #[error("Invalid Params: {0}")]
+ InvalidParams(&'static str),
+
+ #[error("Invalid Request: {0}")]
+ InvalidRequest(&'static str),
+
+ #[error("Parse Error: {0}")]
+ ParseError(&'static str),
+
+ #[error("Internal Error")]
+ InternalError,
+}
+
+impl From<serde_json::Error> for RPCError {
+ fn from(error: serde_json::Error) -> Self {
+ RPCError::ParseError(error.to_string().leak())
}
}
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index 23a6e08..d43783f 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -7,7 +7,7 @@ pub mod message;
mod server;
pub use client::{builder::ClientBuilder, Client};
-pub use error::{Error, Result};
+pub use error::{Error, RPCError, RPCResult, Result};
pub use server::{
builder::ServerBuilder,
channel::{Channel, Subscription},
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 36ece38..deb5d77 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
+use crate::RPCError;
+
pub type ID = u64;
pub const JSONRPC_VERSION: &str = "2.0";
@@ -22,6 +24,8 @@ pub const INTERNAL_ERROR_CODE: i32 = -32603;
/// SubscriptionID is used to identify a subscription.
pub type SubscriptionID = u32;
+pub const INTERNAL_ERROR_MSG: &str = "Internal error";
+
/// Request represents a JSON-RPC request message.
/// It includes the JSON-RPC version, an identifier for the request, the method
/// to be invoked, and optional parameters.
@@ -131,3 +135,57 @@ impl std::fmt::Display for Notification {
)
}
}
+
+impl Default for Response {
+ fn default() -> Self {
+ Self {
+ jsonrpc: JSONRPC_VERSION.to_string(),
+ error: None,
+ id: None,
+ result: None,
+ }
+ }
+}
+
+impl RPCError {
+ pub fn to_response(
+ &self,
+ id: Option<serde_json::Value>,
+ data: Option<serde_json::Value>,
+ ) -> Response {
+ let err: Error = match self {
+ RPCError::ParseError(msg) => Error {
+ code: PARSE_ERROR_CODE,
+ message: msg.to_string(),
+ data,
+ },
+ RPCError::InvalidParams(msg) => Error {
+ code: INVALID_PARAMS_ERROR_CODE,
+ message: msg.to_string(),
+ data,
+ },
+ RPCError::InvalidRequest(msg) => Error {
+ code: INVALID_REQUEST_ERROR_CODE,
+ message: msg.to_string(),
+ data,
+ },
+ RPCError::CustomError(code, msg) => Error {
+ code: *code,
+ message: msg.to_string(),
+ data,
+ },
+ RPCError::InternalError => Error {
+ code: INTERNAL_ERROR_CODE,
+ message: INTERNAL_ERROR_MSG.to_string(),
+ data,
+ },
+ };
+
+ Response {
+ jsonrpc: JSONRPC_VERSION.to_string(),
+ error: Some(err),
+ result: None,
+ id,
+ }
+ }
+}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 86b1b31..dd176d0 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -19,7 +19,6 @@ use response_queue::ResponseQueue;
pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request";
pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
-pub const INTERNAL_ERROR_MSG: &str = "Internal error";
struct NewRequest {
srvc_name: String,
@@ -165,11 +164,15 @@ impl Server {
let rpc_msg = match serde_json::from_value::<message::Request>(request) {
Ok(m) => m,
Err(_) => {
- return SanityCheckResult::ErrRes(pack_err_res(
- message::PARSE_ERROR_CODE,
- FAILED_TO_PARSE_ERROR_MSG,
- None,
- ));
+ let response = message::Response {
+ error: Some(message::Error {
+ code: message::PARSE_ERROR_CODE,
+ message: FAILED_TO_PARSE_ERROR_MSG.to_string(),
+ data: None,
+ }),
+ ..Default::default()
+ };
+ return SanityCheckResult::ErrRes(response);
}
};
debug!("<-- {rpc_msg}");
@@ -178,11 +181,16 @@ impl Server {
let srvc_method_str = rpc_msg.method.clone();
let srvc_method: Vec<&str> = srvc_method_str.split('.').collect();
if srvc_method.len() < 2 {
- return SanityCheckResult::ErrRes(pack_err_res(
- message::INVALID_REQUEST_ERROR_CODE,
- INVALID_REQUEST_ERROR_MSG,
- Some(rpc_msg.id),
- ));
+ let response = message::Response {
+ error: Some(message::Error {
+ code: message::INVALID_REQUEST_ERROR_CODE,
+ message: INVALID_REQUEST_ERROR_MSG.to_string(),
+ data: None,
+ }),
+ id: Some(rpc_msg.id),
+ ..Default::default()
+ };
+ return SanityCheckResult::ErrRes(response);
}
let srvc_name = srvc_method[0].to_string();
@@ -235,10 +243,10 @@ impl Server {
};
let mut response = message::Response {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
result: None,
id: Some(req.msg.id.clone()),
+ ..Default::default()
};
// Check if the service exists in pubsub services list
@@ -249,7 +257,7 @@ impl Server {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
response.result = match method(channel, name, params).await {
Ok(res) => Some(res),
- Err(err) => return self.handle_error(err, req.msg.id),
+ Err(err) => return err.to_response(Some(req.msg.id), None),
};
return response;
@@ -263,54 +271,19 @@ impl Server {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
response.result = match method(params).await {
Ok(res) => Some(res),
- Err(err) => return self.handle_error(err, req.msg.id),
+ Err(err) => return err.to_response(Some(req.msg.id), None),
};
return response;
}
}
- pack_err_res(
- message::METHOD_NOT_FOUND_ERROR_CODE,
- METHOD_NOT_FOUND_ERROR_MSG,
- Some(req.msg.id),
- )
- }
-
- fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response {
- match err {
- Error::ParseJSON(_) => pack_err_res(
- message::PARSE_ERROR_CODE,
- FAILED_TO_PARSE_ERROR_MSG,
- Some(msg_id),
- ),
- Error::InvalidParams(msg) => {
- pack_err_res(message::INVALID_PARAMS_ERROR_CODE, msg, Some(msg_id))
- }
- Error::InvalidRequest(msg) => {
- pack_err_res(message::INVALID_REQUEST_ERROR_CODE, msg, Some(msg_id))
- }
- Error::RPCMethodError(code, msg) => pack_err_res(code, msg, Some(msg_id)),
- _ => pack_err_res(
- message::INTERNAL_ERROR_CODE,
- INTERNAL_ERROR_MSG,
- Some(msg_id),
- ),
- }
- }
-}
+ response.error = Some(message::Error {
+ code: message::METHOD_NOT_FOUND_ERROR_CODE,
+ message: METHOD_NOT_FOUND_ERROR_MSG.to_string(),
+ data: None,
+ });
-fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
- let err = message::Error {
- code,
- message: msg.to_string(),
- data: None,
- };
-
- message::Response {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- error: Some(err),
- result: None,
- id,
+ response
}
}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 08d1bbb..a6b4c11 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin, sync::Arc};
-use crate::Result;
+use crate::RPCResult;
use super::channel::Channel;
@@ -8,7 +8,7 @@ use super::channel::Channel;
pub type PubSubRPCMethod<'a> =
Box<dyn Fn(Arc<Channel>, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
type PubSubRPCMethodOutput<'a> =
- Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
+ Pin<Box<dyn Future<Output = RPCResult<serde_json::Value>> + Send + Sync + 'a>>;
/// Defines the interface for an RPC service.
pub trait PubSubRPCService: Sync + Send {
@@ -23,16 +23,16 @@ pub trait PubSubRPCService: Sync + Send {
/// ```
/// use serde_json::Value;
///
-/// use karyon_jsonrpc::{Error, impl_rpc_service};
+/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
///
/// struct Hello {}
///
/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, Error> {
+/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
/// Ok(serde_json::json!("foo!"))
/// }
///
-/// async fn bar(&self, params: Value) -> Result<Value, Error> {
+/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
/// Ok(serde_json::json!("bar!"))
/// }
/// }
diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs
index 4c8c4b8..9cc1d21 100644
--- a/jsonrpc/src/server/service.rs
+++ b/jsonrpc/src/server/service.rs
@@ -1,11 +1,11 @@
use std::{future::Future, pin::Pin};
-use crate::Result;
+use crate::RPCResult;
/// Represents the RPC method
pub type RPCMethod<'a> = Box<dyn Fn(serde_json::Value) -> RPCMethodOutput<'a> + Send + 'a>;
type RPCMethodOutput<'a> =
- Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
+ Pin<Box<dyn Future<Output = RPCResult<serde_json::Value>> + Send + Sync + 'a>>;
/// Defines the interface for an RPC service.
pub trait RPCService: Sync + Send {
@@ -20,16 +20,16 @@ pub trait RPCService: Sync + Send {
/// ```
/// use serde_json::Value;
///
-/// use karyon_jsonrpc::{Error, impl_rpc_service};
+/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
///
/// struct Hello {}
///
/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, Error> {
+/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
/// Ok(serde_json::json!("foo!"))
/// }
///
-/// async fn bar(&self, params: Value) -> Result<Value, Error> {
+/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
/// Ok(serde_json::json!("bar!"))
/// }
/// }
diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs
index e590ae1..bb42679 100644
--- a/jsonrpc/tests/impl_rpc_service.rs
+++ b/jsonrpc/tests/impl_rpc_service.rs
@@ -1,4 +1,4 @@
-use karyon_jsonrpc::{impl_rpc_service, Error, RPCService};
+use karyon_jsonrpc::{impl_rpc_service, RPCError, RPCService};
use serde_json::Value;
#[test]
@@ -6,7 +6,7 @@ fn service() {
struct Foo {}
impl Foo {
- async fn foo(&self, params: Value) -> Result<Value, Error> {
+ async fn foo(&self, params: Value) -> Result<Value, RPCError> {
Ok(params)
}
}
diff --git a/jsonrpc/tests/rpc_impl.rs b/jsonrpc/tests/rpc_impl.rs
index 5b14b59..64e3bb1 100644
--- a/jsonrpc/tests/rpc_impl.rs
+++ b/jsonrpc/tests/rpc_impl.rs
@@ -1,4 +1,4 @@
-use karyon_jsonrpc::{rpc_impl, Error, RPCService};
+use karyon_jsonrpc::{rpc_impl, RPCError, RPCService};
use serde_json::Value;
#[test]
@@ -7,7 +7,7 @@ fn rpc_impl_service() {
#[rpc_impl]
impl Foo {
- async fn foo(&self, params: Value) -> Result<Value, Error> {
+ async fn foo(&self, params: Value) -> Result<Value, RPCError> {
Ok(params)
}
}
diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs
index 15ce8da..bc6ab81 100644
--- a/p2p/examples/monitor/src/service.rs
+++ b/p2p/examples/monitor/src/service.rs
@@ -4,10 +4,13 @@ use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};
use ringbuffer::{AllocRingBuffer, RingBuffer};
use serde::{Deserialize, Serialize};
+use serde_json::Value;
use smol::{lock::Mutex, Executor};
use karyon_core::async_util::{TaskGroup, TaskResult};
-use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Subscription};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription,
+};
use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result};
const EVENT_BUFFER_SIZE: usize = 60;
@@ -82,31 +85,19 @@ impl MonitorRPC {
#[rpc_impl]
impl MonitorRPC {
- pub async fn ping(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn ping(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(Pong {}))
}
- pub async fn peer_id(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn peer_id(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.peer_id().to_string()))
}
- pub async fn inbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn inbound_connection(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.inbound_slots()))
}
- pub async fn outbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn outbound_connection(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.outbound_slots()))
}
}
@@ -117,8 +108,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -131,8 +122,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -145,8 +136,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -159,8 +150,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -170,8 +161,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -181,8 +172,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -194,7 +185,7 @@ struct Pong {}
struct Subscriptions {
subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>,
- buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<serde_json::Value>>>,
+ buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<Value>>>,
task_group: TaskGroup,
}