From 028940fe3e0a87cdc421a6d07f1ecfb6c208b9d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 21 May 2024 02:20:45 +0200 Subject: jsonrpc: support pubsub --- jsonrpc/src/server/channel.rs | 69 ++++++ jsonrpc/src/server/mod.rs | 454 +++++++++++++++++++++++++++++++++++ jsonrpc/src/server/pubsub_service.rs | 67 ++++++ jsonrpc/src/server/service.rs | 64 +++++ 4 files changed, 654 insertions(+) create mode 100644 jsonrpc/src/server/channel.rs create mode 100644 jsonrpc/src/server/mod.rs create mode 100644 jsonrpc/src/server/pubsub_service.rs create mode 100644 jsonrpc/src/server/service.rs (limited to 'jsonrpc/src/server') diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs new file mode 100644 index 0000000..1498825 --- /dev/null +++ b/jsonrpc/src/server/channel.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; + +use karyon_core::{async_runtime::lock::Mutex, util::random_32}; + +use crate::{Error, Result}; + +pub type SubscriptionID = u32; +pub type ArcChannel = Arc; + +/// Represents a new subscription +pub struct Subscription { + pub id: SubscriptionID, + parent: Arc, + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, +} + +impl Subscription { + /// Creates a new `Subscription` + fn new( + parent: Arc, + id: SubscriptionID, + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + ) -> Self { + Self { parent, id, chan } + } + + /// Sends a notification to the subscriber + pub async fn notify(&self, res: serde_json::Value) -> Result<()> { + if self.parent.subs.lock().await.contains(&self.id) { + self.chan.send((self.id, res)).await?; + Ok(()) + } else { + Err(Error::SubscriptionNotFound(self.id.to_string())) + } + } +} + +/// Represents a channel for creating/removing subscriptions +pub struct Channel { + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + subs: Mutex>, +} + +impl Channel { + /// Creates a new `Channel` + pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel { + Arc::new(Self { + chan, + subs: Mutex::new(Vec::new()), + }) + } + + /// Creates a new subscription + pub async fn new_subscription(self: &Arc) -> Subscription { + let sub_id = random_32(); + let sub = Subscription::new(self.clone(), sub_id, self.chan.clone()); + self.subs.lock().await.push(sub_id); + sub + } + + /// Removes a subscription + pub async fn remove_subscription(self: &Arc, id: &SubscriptionID) { + let i = match self.subs.lock().await.iter().position(|i| i == id) { + Some(i) => i, + None => return, + }; + self.subs.lock().await.remove(i); + } +} diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs new file mode 100644 index 0000000..4ebab10 --- /dev/null +++ b/jsonrpc/src/server/mod.rs @@ -0,0 +1,454 @@ +pub mod channel; +pub mod pubsub_service; +pub mod service; + +use std::{collections::HashMap, sync::Arc}; + +use log::{debug, error, warn}; + +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use karyon_core::{ + async_runtime::Executor, + async_util::{select, Either, TaskGroup, TaskResult}, +}; + +use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; + +#[cfg(feature = "ws")] +use karyon_net::ws::ServerWsConfig; + +use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result, TcpConfig}; + +use channel::{ArcChannel, Channel}; + +const CHANNEL_CAP: usize = 10; + +pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; +pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; +pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; +pub const INTERNAL_ERROR_MSG: &str = "Internal error"; + +fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { + let err = message::Error { + code, + message: msg.to_string(), + data: None, + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: Some(err), + result: None, + id, + subscription: None, + } +} + +struct NewRequest { + srvc_name: String, + method_name: String, + msg: message::Request, +} + +enum SanityCheckResult { + NewReq(NewRequest), + ErrRes(message::Response), +} + +/// Represents an RPC server +pub struct Server { + listener: Listener, + task_group: TaskGroup, + services: HashMap>, + pubsub_services: HashMap>, +} + +impl Server { + /// Returns the local endpoint. + pub fn local_endpoint(&self) -> Result { + self.listener.local_endpoint().map_err(Error::from) + } + + /// Starts the RPC server + pub async fn start(self: Arc) -> Result<()> { + loop { + match self.listener.accept().await { + Ok(conn) => { + if let Err(err) = self.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } + } + } + } + + /// Shuts down the RPC server + pub async fn shutdown(&self) { + self.task_group.cancel().await; + } + + /// Handles a new connection + async fn handle_conn(self: &Arc, conn: Conn) -> Result<()> { + let endpoint = conn.peer_endpoint().expect("get peer endpoint"); + debug!("Handle a new connection {endpoint}"); + + let on_failure = |result: TaskResult>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Connection {} dropped: {}", endpoint, err); + } else { + warn!("Connection {} dropped", endpoint); + } + }; + + let selfc = self.clone(); + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let channel = Channel::new(ch_tx); + self.task_group.spawn( + async move { + loop { + match select(conn.recv(), ch_rx.recv()).await { + Either::Left(msg) => { + // TODO spawn a task + let response = selfc.handle_request(channel.clone(), msg?).await; + debug!("--> {response}"); + conn.send(serde_json::to_value(response)?).await?; + } + Either::Right(msg) => { + let (sub_id, result) = msg?; + let response = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: None, + params: Some(result), + subscription: Some(sub_id.into()), + }; + debug!("--> {response}"); + conn.send(serde_json::to_value(response)?).await?; + } + } + } + }, + on_failure, + ); + + Ok(()) + } + + fn sanity_check(&self, request: serde_json::Value) -> SanityCheckResult { + let rpc_msg = match serde_json::from_value::(request) { + Ok(m) => m, + Err(_) => { + return SanityCheckResult::ErrRes(pack_err_res( + message::PARSE_ERROR_CODE, + FAILED_TO_PARSE_ERROR_MSG, + None, + )); + } + }; + debug!("<-- {rpc_msg}"); + + let srvc_method_str = rpc_msg.method.clone(); + let srvc_method: Vec<&str> = srvc_method_str.split('.').collect(); + if srvc_method.len() < 2 { + return SanityCheckResult::ErrRes(pack_err_res( + message::INVALID_REQUEST_ERROR_CODE, + INVALID_REQUEST_ERROR_MSG, + Some(rpc_msg.id), + )); + } + + let srvc_name = srvc_method[0].to_string(); + let method_name = srvc_method[1].to_string(); + + SanityCheckResult::NewReq(NewRequest { + srvc_name, + method_name, + msg: rpc_msg, + }) + } + + /// Handles a new request + async fn handle_request( + &self, + channel: ArcChannel, + msg: serde_json::Value, + ) -> message::Response { + let req = match self.sanity_check(msg) { + SanityCheckResult::NewReq(req) => req, + SanityCheckResult::ErrRes(res) => return res, + }; + + if req.msg.subscriber.is_some() { + match self.pubsub_services.get(&req.srvc_name) { + Some(s) => { + self.handle_pubsub_request(channel, s, &req.method_name, req.msg) + .await + } + None => pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(req.msg.id), + ), + } + } else { + match self.services.get(&req.srvc_name) { + Some(s) => self.handle_call_request(s, &req.method_name, req.msg).await, + None => pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(req.msg.id), + ), + } + } + } + + /// Handles a call request + async fn handle_call_request( + &self, + service: &Arc, + method_name: &str, + rpc_msg: message::Request, + ) -> message::Response { + let method = match service.get_method(method_name) { + Some(m) => m, + None => { + return pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(rpc_msg.id), + ); + } + }; + + let result = match method(rpc_msg.params.clone()).await { + Ok(res) => res, + Err(err) => return self.handle_error(err, rpc_msg.id), + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: None, + result: Some(result), + id: Some(rpc_msg.id), + subscription: None, + } + } + + /// Handles a pubsub request + async fn handle_pubsub_request( + &self, + channel: ArcChannel, + service: &Arc, + method_name: &str, + rpc_msg: message::Request, + ) -> message::Response { + let method = match service.get_pubsub_method(method_name) { + Some(m) => m, + None => { + return pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(rpc_msg.id), + ); + } + }; + + let result = match method(channel, rpc_msg.params.clone()).await { + Ok(res) => res, + Err(err) => return self.handle_error(err, rpc_msg.id), + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: None, + result: None, + id: Some(rpc_msg.id), + subscription: Some(result), + } + } + + fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response { + match err { + Error::ParseJSON(_) => pack_err_res( + message::PARSE_ERROR_CODE, + FAILED_TO_PARSE_ERROR_MSG, + Some(msg_id), + ), + Error::InvalidParams(msg) => { + pack_err_res(message::INVALID_PARAMS_ERROR_CODE, msg, Some(msg_id)) + } + Error::InvalidRequest(msg) => { + pack_err_res(message::INVALID_REQUEST_ERROR_CODE, msg, Some(msg_id)) + } + Error::RPCMethodError(code, msg) => pack_err_res(code, msg, Some(msg_id)), + _ => pack_err_res( + message::INTERNAL_ERROR_CODE, + INTERNAL_ERROR_MSG, + Some(msg_id), + ), + } + } +} + +/// Builder for constructing an RPC [`Server`]. +pub struct ServerBuilder { + endpoint: Endpoint, + tcp_config: TcpConfig, + tls_config: Option, + services: HashMap>, + pubsub_services: HashMap>, +} + +impl ServerBuilder { + /// Adds a new RPC service to the server. + pub fn service(mut self, service: Arc) -> Self { + self.services.insert(service.name(), service); + self + } + + /// Adds a new PubSub RPC service to the server. + pub fn pubsub_service(mut self, service: Arc) -> Self { + self.pubsub_services.insert(service.name(), service); + self + } + + /// Configure TCP settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tcp_config = TcpConfig::default(); + /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TCP protocols. + pub fn tcp_config(mut self, config: TcpConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tcp_config = config; + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Configure TLS settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tls_config = rustls::ServerConfig::new(...); + /// let server = Server::builder()?.tls_config(tls_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TLS protocols. + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Builds the server with the configured options. + pub async fn build(self) -> Result> { + self._build(TaskGroup::new()).await + } + + /// Builds the server with the configured options and an executor. + pub async fn build_with_executor(self, ex: Executor) -> Result> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result> { + let listener: Listener = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: self.tcp_config, + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, + ), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => { + let config = ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: None, + }; + Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) + } + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + pubsub_services: self.pubsub_services, + })) + } +} + +impl Server { + /// Creates a new [`ServerBuilder`] + /// + /// This function initializes a `ServerBuilder` with the specified endpoint. + /// + /// # Example + /// + /// ```ignore + /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + pubsub_services: HashMap::new(), + tcp_config: Default::default(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs new file mode 100644 index 0000000..5b4bf9a --- /dev/null +++ b/jsonrpc/src/server/pubsub_service.rs @@ -0,0 +1,67 @@ +use std::{future::Future, pin::Pin}; + +use crate::Result; + +use super::channel::ArcChannel; + +/// Represents the RPC method +pub type PubSubRPCMethod<'a> = + Box PubSubRPCMethodOutput<'a> + Send + 'a>; +type PubSubRPCMethodOutput<'a> = + Pin> + Send + Sync + 'a>>; + +/// Defines the interface for an RPC service. +pub trait PubSubRPCService: Sync + Send { + fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option; + fn name(&self) -> String; +} + +/// Implements the [`PubSubRPCService`] trait for a provided type. +/// +/// # Example +/// +/// ``` +/// use serde_json::Value; +/// +/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// +/// struct Hello {} +/// +/// impl Hello { +/// async fn foo(&self, params: Value) -> Result { +/// Ok(serde_json::json!("foo!")) +/// } +/// +/// async fn bar(&self, params: Value) -> Result { +/// Ok(serde_json::json!("bar!")) +/// } +/// } +/// +/// impl_rpc_service!(Hello, foo, bar); +/// +/// ``` +#[macro_export] +macro_rules! impl_pubsub_rpc_service { + ($t:ty, $($m:ident),*) => { + impl karyon_jsonrpc::PubSubRPCService for $t { + fn get_pubsub_method<'a>( + &'a self, + name: &'a str + ) -> Option { + match name { + $( + stringify!($m) => { + Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, params: serde_json::Value| Box::pin(self.$m(chan, params)))) + } + )* + _ => None, + } + + + } + fn name(&self) -> String{ + stringify!($t).to_string() + } + } + }; +} diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs new file mode 100644 index 0000000..4c8c4b8 --- /dev/null +++ b/jsonrpc/src/server/service.rs @@ -0,0 +1,64 @@ +use std::{future::Future, pin::Pin}; + +use crate::Result; + +/// Represents the RPC method +pub type RPCMethod<'a> = Box RPCMethodOutput<'a> + Send + 'a>; +type RPCMethodOutput<'a> = + Pin> + Send + Sync + 'a>>; + +/// Defines the interface for an RPC service. +pub trait RPCService: Sync + Send { + fn get_method<'a>(&'a self, name: &'a str) -> Option; + fn name(&self) -> String; +} + +/// Implements the [`RPCService`] trait for a provided type. +/// +/// # Example +/// +/// ``` +/// use serde_json::Value; +/// +/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// +/// struct Hello {} +/// +/// impl Hello { +/// async fn foo(&self, params: Value) -> Result { +/// Ok(serde_json::json!("foo!")) +/// } +/// +/// async fn bar(&self, params: Value) -> Result { +/// Ok(serde_json::json!("bar!")) +/// } +/// } +/// +/// impl_rpc_service!(Hello, foo, bar); +/// +/// ``` +#[macro_export] +macro_rules! impl_rpc_service { + ($t:ty, $($m:ident),*) => { + impl karyon_jsonrpc::RPCService for $t { + fn get_method<'a>( + &'a self, + name: &'a str + ) -> Option { + match name { + $( + stringify!($m) => { + Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) + } + )* + _ => None, + } + + + } + fn name(&self) -> String{ + stringify!($t).to_string() + } + } + }; +} -- cgit v1.2.3