diff options
Diffstat (limited to 'jsonrpc')
-rw-r--r-- | jsonrpc/README.md | 14 | ||||
-rw-r--r-- | jsonrpc/examples/pubsub_server.rs | 12 | ||||
-rw-r--r-- | jsonrpc/examples/server.rs | 10 | ||||
-rw-r--r-- | jsonrpc/examples/tokio_server/src/main.rs | 16 | ||||
-rw-r--r-- | jsonrpc/src/error.rs | 44 | ||||
-rw-r--r-- | jsonrpc/src/lib.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/message.rs | 58 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 83 | ||||
-rw-r--r-- | jsonrpc/src/server/pubsub_service.rs | 10 | ||||
-rw-r--r-- | jsonrpc/src/server/service.rs | 10 | ||||
-rw-r--r-- | jsonrpc/tests/impl_rpc_service.rs | 4 | ||||
-rw-r--r-- | jsonrpc/tests/rpc_impl.rs | 4 |
12 files changed, 160 insertions, 107 deletions
diff --git a/jsonrpc/README.md b/jsonrpc/README.md index 8127727..4f016b4 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -31,7 +31,7 @@ use serde_json::Value; use smol::stream::StreamExt; use karyon_jsonrpc::{ - Error, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID, + RPCError, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID, Channel }; @@ -39,30 +39,30 @@ struct HelloWorld {} #[rpc_impl] impl HelloWorld { - async fn say_hello(&self, params: Value) -> Result<Value, Error> { + async fn say_hello(&self, params: Value) -> Result<Value, RPCError> { let msg: String = serde_json::from_value(params)?; Ok(serde_json::json!(format!("Hello {msg}!"))) } - async fn foo(&self, params: Value) -> Result<Value, Error> { + async fn foo(&self, params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!("foo!")) } - async fn bar(&self, params: Value) -> Result<Value, Error> { + async fn bar(&self, params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!("bar!")) } } #[rpc_pubsub_impl] impl HelloWorld { - async fn log_subscribe(&self, chan: Arc<Channel>, method: String, _params: Value) -> Result<Value, Error> { + async fn log_subscribe(&self, chan: Arc<Channel>, method: String, _params: Value) -> Result<Value, RPCError> { let sub = chan.new_subscription(&method).await; let sub_id = sub.id.clone(); smol::spawn(async move { loop { smol::Timer::after(std::time::Duration::from_secs(1)).await; if let Err(err) = sub.notify(serde_json::json!("Hello")).await { - println!("Error send notification {err}"); + println!("Failed to send notification: {err}"); break; } } @@ -72,7 +72,7 @@ impl HelloWorld { Ok(serde_json::json!(sub_id)) } - async fn log_unsubscribe(&self, chan: Arc<Channel>, method: String, params: Value) -> Result<Value, Error> { + async fn log_unsubscribe(&self, chan: Arc<Channel>, method: String, params: Value) -> Result<Value, RPCError> { let sub_id: SubscriptionID = serde_json::from_value(params)?; chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs index 40ea756..d1623c1 100644 --- a/jsonrpc/examples/pubsub_server.rs +++ b/jsonrpc/examples/pubsub_server.rs @@ -5,7 +5,9 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use karyon_core::async_util::sleep; -use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server}; +use karyon_jsonrpc::{ + message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server, +}; struct Calc {} @@ -20,7 +22,7 @@ struct Pong {} #[rpc_impl] impl Calc { - async fn ping(&self, _params: Value) -> Result<Value, Error> { + async fn ping(&self, _params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!(Pong {})) } } @@ -32,14 +34,14 @@ impl Calc { chan: Arc<Channel>, method: String, _params: Value, - ) -> Result<Value, Error> { + ) -> Result<Value, RPCError> { let sub = chan.new_subscription(&method).await; let sub_id = sub.id.clone(); smol::spawn(async move { loop { sleep(Duration::from_millis(500)).await; if let Err(err) = sub.notify(serde_json::json!("Hello")).await { - error!("Error send notification {err}"); + error!("Send notification {err}"); break; } } @@ -54,7 +56,7 @@ impl Calc { chan: Arc<Channel>, _method: String, params: Value, - ) -> Result<Value, Error> { + ) -> Result<Value, RPCError> { let sub_id: SubscriptionID = serde_json::from_value(params)?; chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 31e65dd..acbe2a9 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use karyon_core::async_util::sleep; -use karyon_jsonrpc::{rpc_impl, Error, Server}; +use karyon_jsonrpc::{rpc_impl, RPCError, Server}; struct Calc { version: String, @@ -21,21 +21,21 @@ struct Pong {} #[rpc_impl] impl Calc { - async fn ping(&self, _params: Value) -> Result<Value, Error> { + async fn ping(&self, _params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!(Pong {})) } - async fn add(&self, params: Value) -> Result<Value, Error> { + async fn add(&self, params: Value) -> Result<Value, RPCError> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x + params.y)) } - async fn sub(&self, params: Value) -> Result<Value, Error> { + async fn sub(&self, params: Value) -> Result<Value, RPCError> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x - params.y)) } - async fn version(&self, _params: Value) -> Result<Value, Error> { + async fn version(&self, _params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!(self.version)) } } diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index a9b2b32..5a8d604 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server}; +use karyon_jsonrpc::{ + message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCError, Server, +}; struct Calc { version: String, @@ -20,21 +22,21 @@ struct Pong {} #[rpc_impl] impl Calc { - async fn ping(&self, _params: Value) -> Result<Value, Error> { + async fn ping(&self, _params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!(Pong {})) } - async fn add(&self, params: Value) -> Result<Value, Error> { + async fn add(&self, params: Value) -> Result<Value, RPCError> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x + params.y)) } - async fn sub(&self, params: Value) -> Result<Value, Error> { + async fn sub(&self, params: Value) -> Result<Value, RPCError> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x - params.y)) } - async fn version(&self, _params: Value) -> Result<Value, Error> { + async fn version(&self, _params: Value) -> Result<Value, RPCError> { Ok(serde_json::json!(self.version)) } } @@ -46,7 +48,7 @@ impl Calc { chan: Arc<Channel>, method: String, _params: Value, - ) -> Result<Value, Error> { + ) -> Result<Value, RPCError> { let sub = chan.new_subscription(&method).await; let sub_id = sub.id; tokio::spawn(async move { @@ -66,7 +68,7 @@ impl Calc { chan: Arc<Channel>, _method: String, params: Value, - ) -> Result<Value, Error> { + ) -> Result<Value, RPCError> { let sub_id: SubscriptionID = serde_json::from_value(params)?; chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 3994bcf..89d0e2f 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -14,21 +14,12 @@ pub enum Error { #[error("Subscribe Error: code: {0} msg: {1}")] SubscribeError(i32, String), - #[error("RPC Method Error: code: {0} msg: {1}")] - RPCMethodError(i32, &'static str), - - #[error("Invalid Params: {0}")] - InvalidParams(&'static str), - - #[error("Invalid Request: {0}")] - InvalidRequest(&'static str), + #[error("Invalid Message Error: {0}")] + InvalidMsg(&'static str), #[error(transparent)] ParseJSON(#[from] serde_json::Error), - #[error("Invalid Message Error: {0}")] - InvalidMsg(&'static str), - #[error("Unsupported protocol: {0}")] UnsupportedProtocol(String), @@ -42,7 +33,7 @@ pub enum Error { ChannelRecv(#[from] async_channel::RecvError), #[error("Channel send Error: {0}")] - ChannelSend(String), + ChannelSend(&'static str), #[error("Unexpected Error: {0}")] General(&'static str), @@ -56,6 +47,33 @@ pub enum Error { impl<T> From<async_channel::SendError<T>> for Error { fn from(error: async_channel::SendError<T>) -> Self { - Error::ChannelSend(error.to_string()) + Error::ChannelSend(error.to_string().leak()) + } +} + +pub type RPCResult<T> = std::result::Result<T, RPCError>; + +/// Represents RPC Error. +#[derive(ThisError, Debug)] +pub enum RPCError { + #[error("Custom Error: code: {0} msg: {1}")] + CustomError(i32, &'static str), + + #[error("Invalid Params: {0}")] + InvalidParams(&'static str), + + #[error("Invalid Request: {0}")] + InvalidRequest(&'static str), + + #[error("Parse Error: {0}")] + ParseError(&'static str), + + #[error("Internal Error")] + InternalError, +} + +impl From<serde_json::Error> for RPCError { + fn from(error: serde_json::Error) -> Self { + RPCError::ParseError(error.to_string().leak()) } } diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 23a6e08..d43783f 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -7,7 +7,7 @@ pub mod message; mod server; pub use client::{builder::ClientBuilder, Client}; -pub use error::{Error, Result}; +pub use error::{Error, RPCError, RPCResult, Result}; pub use server::{ builder::ServerBuilder, channel::{Channel, Subscription}, diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 36ece38..deb5d77 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +use crate::RPCError; + pub type ID = u64; pub const JSONRPC_VERSION: &str = "2.0"; @@ -22,6 +24,8 @@ pub const INTERNAL_ERROR_CODE: i32 = -32603; /// SubscriptionID is used to identify a subscription. pub type SubscriptionID = u32; +pub const INTERNAL_ERROR_MSG: &str = "Internal error"; + /// Request represents a JSON-RPC request message. /// It includes the JSON-RPC version, an identifier for the request, the method /// to be invoked, and optional parameters. @@ -131,3 +135,57 @@ impl std::fmt::Display for Notification { ) } } + +impl Default for Response { + fn default() -> Self { + Self { + jsonrpc: JSONRPC_VERSION.to_string(), + error: None, + id: None, + result: None, + } + } +} + +impl RPCError { + pub fn to_response( + &self, + id: Option<serde_json::Value>, + data: Option<serde_json::Value>, + ) -> Response { + let err: Error = match self { + RPCError::ParseError(msg) => Error { + code: PARSE_ERROR_CODE, + message: msg.to_string(), + data, + }, + RPCError::InvalidParams(msg) => Error { + code: INVALID_PARAMS_ERROR_CODE, + message: msg.to_string(), + data, + }, + RPCError::InvalidRequest(msg) => Error { + code: INVALID_REQUEST_ERROR_CODE, + message: msg.to_string(), + data, + }, + RPCError::CustomError(code, msg) => Error { + code: *code, + message: msg.to_string(), + data, + }, + RPCError::InternalError => Error { + code: INTERNAL_ERROR_CODE, + message: INTERNAL_ERROR_MSG.to_string(), + data, + }, + }; + + Response { + jsonrpc: JSONRPC_VERSION.to_string(), + error: Some(err), + result: None, + id, + } + } +} diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 86b1b31..dd176d0 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -19,7 +19,6 @@ use response_queue::ResponseQueue; pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; -pub const INTERNAL_ERROR_MSG: &str = "Internal error"; struct NewRequest { srvc_name: String, @@ -165,11 +164,15 @@ impl Server { let rpc_msg = match serde_json::from_value::<message::Request>(request) { Ok(m) => m, Err(_) => { - return SanityCheckResult::ErrRes(pack_err_res( - message::PARSE_ERROR_CODE, - FAILED_TO_PARSE_ERROR_MSG, - None, - )); + let response = message::Response { + error: Some(message::Error { + code: message::PARSE_ERROR_CODE, + message: FAILED_TO_PARSE_ERROR_MSG.to_string(), + data: None, + }), + ..Default::default() + }; + return SanityCheckResult::ErrRes(response); } }; debug!("<-- {rpc_msg}"); @@ -178,11 +181,16 @@ impl Server { let srvc_method_str = rpc_msg.method.clone(); let srvc_method: Vec<&str> = srvc_method_str.split('.').collect(); if srvc_method.len() < 2 { - return SanityCheckResult::ErrRes(pack_err_res( - message::INVALID_REQUEST_ERROR_CODE, - INVALID_REQUEST_ERROR_MSG, - Some(rpc_msg.id), - )); + let response = message::Response { + error: Some(message::Error { + code: message::INVALID_REQUEST_ERROR_CODE, + message: INVALID_REQUEST_ERROR_MSG.to_string(), + data: None, + }), + id: Some(rpc_msg.id), + ..Default::default() + }; + return SanityCheckResult::ErrRes(response); } let srvc_name = srvc_method[0].to_string(); @@ -235,10 +243,10 @@ impl Server { }; let mut response = message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, result: None, id: Some(req.msg.id.clone()), + ..Default::default() }; // Check if the service exists in pubsub services list @@ -249,7 +257,7 @@ impl Server { let params = req.msg.params.unwrap_or(serde_json::json!(())); response.result = match method(channel, name, params).await { Ok(res) => Some(res), - Err(err) => return self.handle_error(err, req.msg.id), + Err(err) => return err.to_response(Some(req.msg.id), None), }; return response; @@ -263,54 +271,19 @@ impl Server { let params = req.msg.params.unwrap_or(serde_json::json!(())); response.result = match method(params).await { Ok(res) => Some(res), - Err(err) => return self.handle_error(err, req.msg.id), + Err(err) => return err.to_response(Some(req.msg.id), None), }; return response; } } - pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(req.msg.id), - ) - } - - fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response { - match err { - Error::ParseJSON(_) => pack_err_res( - message::PARSE_ERROR_CODE, - FAILED_TO_PARSE_ERROR_MSG, - Some(msg_id), - ), - Error::InvalidParams(msg) => { - pack_err_res(message::INVALID_PARAMS_ERROR_CODE, msg, Some(msg_id)) - } - Error::InvalidRequest(msg) => { - pack_err_res(message::INVALID_REQUEST_ERROR_CODE, msg, Some(msg_id)) - } - Error::RPCMethodError(code, msg) => pack_err_res(code, msg, Some(msg_id)), - _ => pack_err_res( - message::INTERNAL_ERROR_CODE, - INTERNAL_ERROR_MSG, - Some(msg_id), - ), - } - } -} + response.error = Some(message::Error { + code: message::METHOD_NOT_FOUND_ERROR_CODE, + message: METHOD_NOT_FOUND_ERROR_MSG.to_string(), + data: None, + }); -fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response { - let err = message::Error { - code, - message: msg.to_string(), - data: None, - }; - - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: Some(err), - result: None, - id, + response } } diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index 08d1bbb..a6b4c11 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -1,6 +1,6 @@ use std::{future::Future, pin::Pin, sync::Arc}; -use crate::Result; +use crate::RPCResult; use super::channel::Channel; @@ -8,7 +8,7 @@ use super::channel::Channel; pub type PubSubRPCMethod<'a> = Box<dyn Fn(Arc<Channel>, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>; type PubSubRPCMethodOutput<'a> = - Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>; + Pin<Box<dyn Future<Output = RPCResult<serde_json::Value>> + Send + Sync + 'a>>; /// Defines the interface for an RPC service. pub trait PubSubRPCService: Sync + Send { @@ -23,16 +23,16 @@ pub trait PubSubRPCService: Sync + Send { /// ``` /// use serde_json::Value; /// -/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; /// /// struct Hello {} /// /// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, Error> { +/// async fn foo(&self, params: Value) -> Result<Value, RPCError> { /// Ok(serde_json::json!("foo!")) /// } /// -/// async fn bar(&self, params: Value) -> Result<Value, Error> { +/// async fn bar(&self, params: Value) -> Result<Value, RPCError> { /// Ok(serde_json::json!("bar!")) /// } /// } diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs index 4c8c4b8..9cc1d21 100644 --- a/jsonrpc/src/server/service.rs +++ b/jsonrpc/src/server/service.rs @@ -1,11 +1,11 @@ use std::{future::Future, pin::Pin}; -use crate::Result; +use crate::RPCResult; /// Represents the RPC method pub type RPCMethod<'a> = Box<dyn Fn(serde_json::Value) -> RPCMethodOutput<'a> + Send + 'a>; type RPCMethodOutput<'a> = - Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>; + Pin<Box<dyn Future<Output = RPCResult<serde_json::Value>> + Send + Sync + 'a>>; /// Defines the interface for an RPC service. pub trait RPCService: Sync + Send { @@ -20,16 +20,16 @@ pub trait RPCService: Sync + Send { /// ``` /// use serde_json::Value; /// -/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; /// /// struct Hello {} /// /// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, Error> { +/// async fn foo(&self, params: Value) -> Result<Value, RPCError> { /// Ok(serde_json::json!("foo!")) /// } /// -/// async fn bar(&self, params: Value) -> Result<Value, Error> { +/// async fn bar(&self, params: Value) -> Result<Value, RPCError> { /// Ok(serde_json::json!("bar!")) /// } /// } diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs index e590ae1..bb42679 100644 --- a/jsonrpc/tests/impl_rpc_service.rs +++ b/jsonrpc/tests/impl_rpc_service.rs @@ -1,4 +1,4 @@ -use karyon_jsonrpc::{impl_rpc_service, Error, RPCService}; +use karyon_jsonrpc::{impl_rpc_service, RPCError, RPCService}; use serde_json::Value; #[test] @@ -6,7 +6,7 @@ fn service() { struct Foo {} impl Foo { - async fn foo(&self, params: Value) -> Result<Value, Error> { + async fn foo(&self, params: Value) -> Result<Value, RPCError> { Ok(params) } } diff --git a/jsonrpc/tests/rpc_impl.rs b/jsonrpc/tests/rpc_impl.rs index 5b14b59..64e3bb1 100644 --- a/jsonrpc/tests/rpc_impl.rs +++ b/jsonrpc/tests/rpc_impl.rs @@ -1,4 +1,4 @@ -use karyon_jsonrpc::{rpc_impl, Error, RPCService}; +use karyon_jsonrpc::{rpc_impl, RPCError, RPCService}; use serde_json::Value; #[test] @@ -7,7 +7,7 @@ fn rpc_impl_service() { #[rpc_impl] impl Foo { - async fn foo(&self, params: Value) -> Result<Value, Error> { + async fn foo(&self, params: Value) -> Result<Value, RPCError> { Ok(params) } } |