From 028940fe3e0a87cdc421a6d07f1ecfb6c208b9d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 21 May 2024 02:20:45 +0200 Subject: jsonrpc: support pubsub --- Cargo.lock | 20 +- jsonrpc/Cargo.toml | 1 + jsonrpc/README.md | 62 +++- jsonrpc/examples/pubsub_client.rs | 47 ++++ jsonrpc/examples/pubsub_server.rs | 69 +++++ jsonrpc/examples/server.rs | 4 +- jsonrpc/examples/tokio_server/Cargo.lock | 1 + jsonrpc/examples/tokio_server/src/main.rs | 4 +- jsonrpc/jsonrpc_macro/src/lib.rs | 38 +++ jsonrpc/src/client.rs | 158 ----------- jsonrpc/src/client/mod.rs | 374 ++++++++++++++++++++++++ jsonrpc/src/error.rs | 18 ++ jsonrpc/src/lib.rs | 83 +----- jsonrpc/src/message.rs | 36 ++- jsonrpc/src/server.rs | 282 ------------------- jsonrpc/src/server/channel.rs | 69 +++++ jsonrpc/src/server/mod.rs | 454 ++++++++++++++++++++++++++++++ jsonrpc/src/server/pubsub_service.rs | 67 +++++ jsonrpc/src/server/service.rs | 64 +++++ jsonrpc/src/service.rs | 64 ----- 20 files changed, 1310 insertions(+), 605 deletions(-) create mode 100644 jsonrpc/examples/pubsub_client.rs create mode 100644 jsonrpc/examples/pubsub_server.rs delete mode 100644 jsonrpc/src/client.rs create mode 100644 jsonrpc/src/client/mod.rs delete mode 100644 jsonrpc/src/server.rs create mode 100644 jsonrpc/src/server/channel.rs create mode 100644 jsonrpc/src/server/mod.rs create mode 100644 jsonrpc/src/server/pubsub_service.rs create mode 100644 jsonrpc/src/server/service.rs delete mode 100644 jsonrpc/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index ee085da..708e50a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,12 +142,11 @@ dependencies = [ [[package]] name = "async-channel" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2776ead772134d55b62dd45e59a79e21612d85d0af729b8b7d3967d601a62a" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" dependencies = [ "concurrent-queue", - "event-listener 5.3.0", "event-listener-strategy 0.5.2", "futures-core", "pin-project-lite", @@ -183,7 +182,7 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-executor", "async-io 2.3.2", "async-lock 3.3.0", @@ -268,7 +267,7 @@ version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a53fc6301894e04a92cb2584fedde80cb25ba8e02d9dc39d4a87d036e22f397d" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-io 2.3.2", "async-lock 3.3.0", "async-signal", @@ -494,7 +493,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "495f7104e962b7356f0aeb34247aca1fe7d2e783b346582db7f2904cb5717e88" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-lock 3.3.0", "async-task", "futures-io", @@ -1234,7 +1233,7 @@ dependencies = [ name = "karyon_core" version = "0.1.0" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-process", "bincode", "chrono", @@ -1253,6 +1252,7 @@ dependencies = [ name = "karyon_jsonrpc" version = "0.1.0" dependencies = [ + "async-channel 2.3.1", "async-trait", "async-tungstenite", "env_logger", @@ -1286,7 +1286,7 @@ dependencies = [ name = "karyon_net" version = "0.1.0" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-trait", "async-tungstenite", "bincode", @@ -1307,7 +1307,7 @@ dependencies = [ name = "karyon_p2p" version = "0.1.0" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-std", "async-trait", "bincode", @@ -2055,7 +2055,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e635339259e51ef85ac7aa29a1cd991b957047507288697a690e80ab97d07cad" dependencies = [ - "async-channel 2.3.0", + "async-channel 2.3.1", "async-executor", "async-fs", "async-io 2.3.2", diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index be3176b..40779fe 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -43,6 +43,7 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" thiserror = "1.0.58" async-trait = "0.1.77" +async-channel = "2.3.1" async-tungstenite = { version = "0.25.0", default-features = false, optional = true } diff --git a/jsonrpc/README.md b/jsonrpc/README.md index 0883bec..3322671 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -5,10 +5,13 @@ A fast and lightweight async implementation of [JSON-RPC features: - Supports TCP, TLS, WebSocket, and Unix protocols. -- Uses smol(async-std) as the async runtime, but also supports tokio via the +- Uses `smol`(async-std) as the async runtime, but also supports `tokio` via the `tokio` feature. - Allows registration of multiple services (structs) of different types on a single server. +- Supports pub/sub +- Allows passing an `async_executors::Executor` or tokio's `Runtime` when building + the server. ## Example @@ -16,8 +19,11 @@ features: use std::sync::Arc; use serde_json::Value; +use smol::stream::StreamExt; -use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; +use karyon_jsonrpc::{ + Error, Server, Client, rpc_impl, rpc_pubsub_impl, SubscriptionID, ArcChannel +}; struct HelloWorld {} @@ -37,12 +43,42 @@ impl HelloWorld { } } +#[rpc_pubsub_impl] +impl HelloWorld { + async fn log_subscribe(&self, chan: ArcChannel, _params: Value) -> Result { + let sub = chan.new_subscription().await; + let sub_id = sub.id.clone(); + smol::spawn(async move { + loop { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + if let Err(err) = sub.notify(serde_json::json!("Hello")).await { + println!("Error send notification {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe(&self, chan: ArcChannel, params: Value) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + + // Server async { + let service = Arc::new(HelloWorld {}); // Creates a new server + let server = Server::builder("tcp://127.0.0.1:60000") .expect("create new server builder") - .service(HelloWorld{}) + .service(service.clone()) + .pubsub_service(service) .build() .await .expect("build the server"); @@ -63,6 +99,26 @@ async { let result: String = client.call("HelloWorld.say_hello", "world".to_string()) .await .expect("send a request"); + + let (sub_id, sub) = client + .subscribe("Calc.log_subscribe", ()) + .await + .expect("Subscribe to log_subscribe method"); + + smol::spawn(async move { + sub.for_each(|m| { + println!("Receive new notification: {m}"); + }) + .await + }) + .detach(); + + smol::Timer::after(std::time::Duration::from_secs(5)).await; + + client + .unsubscribe("Calc.log_unsubscribe", sub_id) + .await + .expect("Unsubscribe from log_unsubscirbe method"); }; ``` diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs new file mode 100644 index 0000000..fee2a26 --- /dev/null +++ b/jsonrpc/examples/pubsub_client.rs @@ -0,0 +1,47 @@ +use serde::{Deserialize, Serialize}; +use smol::stream::StreamExt; + +use karyon_jsonrpc::Client; + +#[derive(Deserialize, Serialize, Debug)] +struct Pong {} + +fn main() { + env_logger::init(); + smol::future::block_on(async { + let client = Client::builder("tcp://127.0.0.1:6000") + .expect("Create client builder") + .build() + .await + .expect("Build a client"); + + let result: Pong = client + .call("Calc.ping", ()) + .await + .expect("Send ping request"); + + println!("receive pong msg: {:?}", result); + + let (sub_id, sub) = client + .subscribe("Calc.log_subscribe", ()) + .await + .expect("Subscribe to log_subscribe method"); + + smol::spawn(async move { + sub.for_each(|m| { + println!("Receive new notification: {m}"); + }) + .await + }) + .detach(); + + smol::Timer::after(std::time::Duration::from_secs(5)).await; + + client + .unsubscribe("Calc.log_unsubscribe", sub_id) + .await + .expect("Unsubscribe from log_unsubscirbe method"); + + smol::Timer::after(std::time::Duration::from_secs(2)).await; + }); +} diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs new file mode 100644 index 0000000..739e6d5 --- /dev/null +++ b/jsonrpc/examples/pubsub_server.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, SubscriptionID}; + +struct Calc {} + +#[derive(Deserialize, Serialize)] +struct Req { + x: u32, + y: u32, +} + +#[derive(Deserialize, Serialize)] +struct Pong {} + +#[rpc_impl] +impl Calc { + async fn ping(&self, _params: Value) -> Result { + Ok(serde_json::json!(Pong {})) + } +} + +#[rpc_pubsub_impl] +impl Calc { + async fn log_subscribe(&self, chan: ArcChannel, _params: Value) -> Result { + let sub = chan.new_subscription().await; + let sub_id = sub.id.clone(); + smol::spawn(async move { + loop { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + if let Err(err) = sub.notify(serde_json::json!("Hello")).await { + println!("Error send notification {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe(&self, chan: ArcChannel, params: Value) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + +fn main() { + env_logger::init(); + smol::block_on(async { + let calc = Arc::new(Calc {}); + + // Creates a new server + let server = Server::builder("tcp://127.0.0.1:6000") + .expect("Create a new server builder") + .service(calc.clone()) + .pubsub_service(calc) + .build() + .await + .expect("Build a new server"); + + // Start the server + server.start().await.expect("Start the server"); + }); +} diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 841e276..5b951cd 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -48,7 +50,7 @@ fn main() { // Creates a new server let server = Server::builder("tcp://127.0.0.1:6000") .expect("Create a new server builder") - .service(calc) + .service(Arc::new(calc)) .build() .await .expect("start a new server"); diff --git a/jsonrpc/examples/tokio_server/Cargo.lock b/jsonrpc/examples/tokio_server/Cargo.lock index a7fdb0b..ab39fcd 100644 --- a/jsonrpc/examples/tokio_server/Cargo.lock +++ b/jsonrpc/examples/tokio_server/Cargo.lock @@ -681,6 +681,7 @@ dependencies = [ name = "karyon_jsonrpc" version = "0.1.0" dependencies = [ + "async-channel", "async-trait", "async-tungstenite", "karyon_core", diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 978c90a..ce77cd3 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -48,7 +50,7 @@ async fn main() { // Creates a new server let server = Server::builder("tcp://127.0.0.1:6000") .expect("Create a new server builder") - .service(calc) + .service(Arc::new(calc)) .build() .await .expect("start a new server"); diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs index c3d51e8..5acfa5e 100644 --- a/jsonrpc/jsonrpc_macro/src/lib.rs +++ b/jsonrpc/jsonrpc_macro/src/lib.rs @@ -45,3 +45,41 @@ pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { quoted.into() } + +// TODO remove duplicate code +#[proc_macro_attribute] +pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let mut methods: Vec = vec![]; + + let item2 = item.clone(); + let parsed_input = parse_macro_input!(item2 as ItemImpl); + + let self_ty = match *parsed_input.self_ty { + Type::Path(p) => p, + _ => err!( + parsed_input.span(), + "implementing the trait `RPCService` on this type is unsupported" + ), + }; + + if parsed_input.items.is_empty() { + err!(self_ty.span(), "At least one method should be implemented"); + } + + for item in parsed_input.items { + match item { + ImplItem::Method(method) => { + methods.push(method.sig.ident); + } + _ => err!(item.span(), "unexpected item"), + } + } + + let item2: TokenStream2 = item.into(); + let quoted = quote! { + karyon_jsonrpc::impl_pubsub_rpc_service!(#self_ty, #(#methods),*); + #item2 + }; + + quoted.into() +} diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs deleted file mode 100644 index b55943e..0000000 --- a/jsonrpc/src/client.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::time::Duration; - -use log::debug; -use serde::{de::DeserializeOwned, Serialize}; - -#[cfg(feature = "smol")] -use futures_rustls::rustls; -#[cfg(feature = "tokio")] -use tokio_rustls::rustls; - -use karyon_core::{async_util::timeout, util::random_32}; -use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; - -#[cfg(feature = "ws")] -use karyon_net::ws::{ClientWsConfig, ClientWssConfig}; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - -use crate::{codec::JsonCodec, message, Error, Result}; - -/// Represents an RPC client -pub struct Client { - conn: Conn, - timeout: Option, -} - -impl Client { - /// Calls the provided method, waits for the response, and returns the result. - pub async fn call( - &self, - method: &str, - params: T, - ) -> Result { - let id = serde_json::json!(random_32()); - - let request = message::Request { - jsonrpc: message::JSONRPC_VERSION.to_string(), - id, - method: method.to_string(), - params: serde_json::json!(params), - }; - - let req_json = serde_json::to_value(&request)?; - match self.timeout { - Some(s) => { - let dur = Duration::from_secs(s); - timeout(dur, self.conn.send(req_json)).await??; - } - None => { - self.conn.send(req_json).await?; - } - } - debug!("--> {request}"); - - let msg = self.conn.recv().await?; - let response = serde_json::from_value::(msg)?; - debug!("<-- {response}"); - - if response.id.is_none() || response.id.unwrap() != request.id { - return Err(Error::InvalidMsg("Invalid response id")); - } - - if let Some(error) = response.error { - return Err(Error::CallError(error.code, error.message)); - } - - match response.result { - Some(result) => Ok(serde_json::from_value::(result)?), - None => Err(Error::InvalidMsg("Invalid response result")), - } - } -} - -pub struct ClientBuilder { - endpoint: Endpoint, - tls_config: Option<(rustls::ClientConfig, String)>, - timeout: Option, -} - -impl ClientBuilder { - pub fn with_timeout(mut self, timeout: u64) -> Self { - self.timeout = Some(timeout); - self - } - - pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tls_config = Some((config, dns_name.to_string())); - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - pub async fn build(self) -> Result { - let conn: Conn = match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::tls::dial( - &self.endpoint, - ClientTlsConfig { - dns_name, - client_config: conf, - tcp_config: Default::default(), - }, - JsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, - ), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { - Some((conf, dns_name)) => Box::new( - karyon_net::ws::dial( - &self.endpoint, - ClientWsConfig { - tcp_config: Default::default(), - wss_config: Some(ClientWssConfig { - dns_name, - client_config: conf, - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {}) - .await?, - ), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new( - karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, - ), - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - Ok(Client { - timeout: self.timeout, - conn, - }) - } -} -impl Client { - pub fn builder(endpoint: impl ToEndpoint) -> Result { - let endpoint = endpoint.to_endpoint()?; - Ok(ClientBuilder { - endpoint, - timeout: None, - tls_config: None, - }) - } -} diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs new file mode 100644 index 0000000..c9253fc --- /dev/null +++ b/jsonrpc/src/client/mod.rs @@ -0,0 +1,374 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use log::{debug, error, warn}; +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::json; + +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use karyon_core::{ + async_runtime::lock::Mutex, + async_util::{timeout, TaskGroup, TaskResult}, + util::random_32, +}; +use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; + +#[cfg(feature = "ws")] +use karyon_net::ws::{ClientWsConfig, ClientWssConfig}; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; + +use crate::{codec::JsonCodec, message, Error, Result, SubscriptionID, TcpConfig}; + +const CHANNEL_CAP: usize = 10; + +const DEFAULT_TIMEOUT: u64 = 1000; // 1s + +/// Type alias for a subscription to receive notifications. +/// +/// The receiver channel is returned by the `subscribe` method to receive +/// notifications from the server. +pub type Subscription = async_channel::Receiver; + +/// Represents an RPC client +pub struct Client { + conn: Conn, + chan_tx: async_channel::Sender, + chan_rx: async_channel::Receiver, + timeout: Option, + subscriptions: Mutex>>, + task_group: TaskGroup, +} + +impl Client { + /// Calls the provided method, waits for the response, and returns the result. + pub async fn call( + &self, + method: &str, + params: T, + ) -> Result { + let request = self.send_request(method, params, None).await?; + debug!("--> {request}"); + + let response = self.chan_rx.recv().await?; + debug!("<-- {response}"); + + if let Some(error) = response.error { + return Err(Error::CallError(error.code, error.message)); + } + + if response.id.is_none() || response.id.unwrap() != request.id { + return Err(Error::InvalidMsg("Invalid response id")); + } + + match response.result { + Some(result) => Ok(serde_json::from_value::(result)?), + None => Err(Error::InvalidMsg("Invalid response result")), + } + } + + /// Subscribes to the provided method, waits for the response, and returns the result. + /// + /// This function sends a subscription request to the specified method + /// with the given parameters. It waits for the response and returns a + /// tuple containing a `SubscriptionID` and a `Subscription` (channel receiver). + pub async fn subscribe( + &self, + method: &str, + params: T, + ) -> Result<(SubscriptionID, Subscription)> { + let request = self.send_request(method, params, Some(json!(true))).await?; + debug!("--> {request}"); + + let response = self.chan_rx.recv().await?; + debug!("<-- {response}"); + + if let Some(error) = response.error { + return Err(Error::SubscribeError(error.code, error.message)); + } + + if response.id.is_none() || response.id.unwrap() != request.id { + return Err(Error::InvalidMsg("Invalid response id")); + } + + let sub_id = match response.subscription { + Some(result) => serde_json::from_value::(result)?, + None => return Err(Error::InvalidMsg("Invalid subscription id")), + }; + + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + self.subscriptions.lock().await.insert(sub_id, ch_tx); + + Ok((sub_id, ch_rx)) + } + + /// Unsubscribes from the provided method, waits for the response, and returns the result. + /// + /// This function sends an unsubscription request for the specified method + /// and subscription ID. It waits for the response to confirm the unsubscription. + pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { + let request = self + .send_request(method, json!(sub_id), Some(json!(true))) + .await?; + debug!("--> {request}"); + + let response = self.chan_rx.recv().await?; + debug!("<-- {response}"); + + if let Some(error) = response.error { + return Err(Error::SubscribeError(error.code, error.message)); + } + + if response.id.is_none() || response.id.unwrap() != request.id { + return Err(Error::InvalidMsg("Invalid response id")); + } + + self.subscriptions.lock().await.remove(&sub_id); + Ok(()) + } + + async fn send_request( + &self, + method: &str, + params: T, + subscriber: Option, + ) -> Result { + let id = json!(random_32()); + + let request = message::Request { + jsonrpc: message::JSONRPC_VERSION.to_string(), + id, + method: method.to_string(), + params: json!(params), + subscriber, + }; + + let req_json = serde_json::to_value(&request)?; + + match self.timeout { + Some(s) => { + let dur = Duration::from_millis(s); + timeout(dur, self.conn.send(req_json)).await??; + } + None => { + self.conn.send(req_json).await?; + } + } + + Ok(request) + } + + fn start_background_receiving(self: &Arc) { + let selfc = self.clone(); + let on_failure = |result: TaskResult>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("background receiving stopped: {err}"); + } + // drop all subscription channels + selfc.subscriptions.lock().await.clear(); + }; + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + let msg = selfc.conn.recv().await?; + + if let Ok(res) = serde_json::from_value::(msg.clone()) { + selfc.chan_tx.send(res).await?; + continue; + } + + if let Ok(nt) = serde_json::from_value::(msg.clone()) { + let sub_id = match nt.subscription.clone() { + Some(id) => serde_json::from_value::(id)?, + None => { + return Err(Error::InvalidMsg( + "Invalid notification msg: subscription id not found", + )) + } + }; + + match selfc.subscriptions.lock().await.get(&sub_id) { + Some(s) => { + s.send(nt.params.unwrap_or(json!(""))).await?; + continue; + } + None => { + warn!("Receive unknown notification {sub_id}"); + continue; + } + } + } + + error!("Receive unexpected msg: {msg}"); + return Err(Error::InvalidMsg("Unexpected msg")); + } + }, + on_failure, + ); + } +} + +/// Builder for constructing an RPC [`Client`]. +pub struct ClientBuilder { + endpoint: Endpoint, + tls_config: Option<(rustls::ClientConfig, String)>, + tcp_config: TcpConfig, + timeout: Option, +} + +impl ClientBuilder { + /// Set timeout for requests, in milliseconds. + /// + /// # Examples + /// + /// ```ignore + /// let client = Client::builder()?.set_timeout(5000).build()?; + /// ``` + pub fn set_timeout(mut self, timeout: u64) -> Self { + self.timeout = Some(timeout); + self + } + + /// Configure TCP settings for the client. + /// + /// # Example + /// + /// ```ignore + /// let tcp_config = TcpConfig::default(); + /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TCP protocols. + pub fn tcp_config(mut self, config: TcpConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tcp_config = config; + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Configure TLS settings for the client. + /// + /// # Example + /// + /// ```ignore + /// let tls_config = rustls::ClientConfig::new(...); + /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TLS protocols. + pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some((config, dns_name.to_string())); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Build RPC client from [`ClientBuilder`]. + /// + /// This function creates a new RPC client using the configurations + /// specified in the `ClientBuilder`. It returns a `Arc` on success. + /// + /// # Example + /// + /// ```ignore + /// let client = Client::builder(endpoint)? + /// .set_timeout(5000) + /// .tcp_config(tcp_config)? + /// .tls_config(tls_config, "example.com")? + /// .build() + /// .await?; + /// ``` + pub async fn build(self) -> Result> { + let conn: Conn = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.endpoint, + ClientTlsConfig { + dns_name, + client_config: conf, + tcp_config: self.tcp_config, + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?, + ), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &self.endpoint, + ClientWsConfig { + tcp_config: self.tcp_config, + wss_config: Some(ClientWssConfig { + dns_name, + client_config: conf, + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => { + let config = ClientWsConfig { + tcp_config: self.tcp_config, + wss_config: None, + }; + Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?) + } + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new( + karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + let (tx, rx) = async_channel::bounded(CHANNEL_CAP); + let client = Arc::new(Client { + timeout: self.timeout, + conn, + chan_tx: tx, + chan_rx: rx, + subscriptions: Mutex::new(HashMap::new()), + task_group: TaskGroup::new(), + }); + client.start_background_receiving(); + Ok(client) + } +} +impl Client { + /// Creates a new [`ClientBuilder`] + /// + /// This function initializes a `ClientBuilder` with the specified endpoint. + /// + /// # Example + /// + /// ```ignore + /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ClientBuilder { + endpoint, + timeout: Some(DEFAULT_TIMEOUT), + tls_config: None, + tcp_config: Default::default(), + }) + } +} diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 7f89729..d68e169 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("Call Error: code: {0} msg: {1}")] CallError(i32, String), + #[error("Subscribe Error: code: {0} msg: {1}")] + SubscribeError(i32, String), + #[error("RPC Method Error: code: {0} msg: {1}")] RPCMethodError(i32, &'static str), @@ -29,6 +32,15 @@ pub enum Error { #[error("Unsupported protocol: {0}")] UnsupportedProtocol(String), + #[error("Subscription not found: {0}")] + SubscriptionNotFound(String), + + #[error(transparent)] + ChannelRecv(#[from] async_channel::RecvError), + + #[error("Channel broadcast Error: {0}")] + ChannelBroadcast(String), + #[error("Unexpected Error: {0}")] General(&'static str), @@ -38,3 +50,9 @@ pub enum Error { #[error(transparent)] KaryonNet(#[from] karyon_net::Error), } + +impl From> for Error { + fn from(error: async_channel::SendError) -> Self { + Error::ChannelBroadcast(error.to_string()) + } +} diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 187b1ad..7573c4d 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,81 +1,20 @@ -//! A fast and lightweight async implementation of [JSON-RPC -//! 2.0](https://www.jsonrpc.org/specification). -//! -//! features: -//! - Supports TCP, TLS, WebSocket, and Unix protocols. -//! - Uses smol(async-std) as the async runtime, but also supports tokio via -//! the `tokio` feature. -//! - Allows registration of multiple services (structs) of different types on a -//! single server. -//! -//! # Example -//! -//! ``` -//! use std::sync::Arc; -//! -//! use serde_json::Value; -//! -//! use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; -//! -//! struct HelloWorld {} -//! -//! #[rpc_impl] -//! impl HelloWorld { -//! async fn say_hello(&self, params: Value) -> Result { -//! let msg: String = serde_json::from_value(params)?; -//! Ok(serde_json::json!(format!("Hello {msg}!"))) -//! } -//! -//! async fn foo(&self, params: Value) -> Result { -//! Ok(serde_json::json!("foo!")) -//! } -//! -//! async fn bar(&self, params: Value) -> Result { -//! Ok(serde_json::json!("bar!")) -//! } -//! } -//! -//! // Server -//! async { -//! // Creates a new server -//! let server = Server::builder("tcp://127.0.0.1:60000") -//! .expect("create new server builder") -//! .service(HelloWorld{}) -//! .build() -//! .await -//! .expect("build the server"); -//! -//! // Starts the server -//! server.start().await.expect("start the server"); -//! }; -//! -//! // Client -//! async { -//! // Creates a new client -//! let client = Client::builder("tcp://127.0.0.1:60000") -//! .expect("create new client builder") -//! .build() -//! .await -//! .expect("build the client"); -//! -//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()) -//! .await -//! .expect("send a request"); -//! }; -//! -//! ``` +#![doc = include_str!("../README.md")] mod client; mod codec; mod error; pub mod message; mod server; -mod service; pub use client::Client; -pub use server::Server; - pub use error::{Error, Result}; -pub use karyon_jsonrpc_macro::rpc_impl; -pub use karyon_net::Endpoint; -pub use service::{RPCMethod, RPCService}; +pub use server::{ + channel::{ArcChannel, Channel, Subscription, SubscriptionID}, + pubsub_service::{PubSubRPCMethod, PubSubRPCService}, + service::{RPCMethod, RPCService}, + Server, +}; + +pub use karyon_jsonrpc_macro::{rpc_impl, rpc_pubsub_impl}; + +pub use karyon_net::{tcp::TcpConfig, Endpoint}; diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index f4bf490..9c89362 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -23,9 +23,12 @@ pub struct Request { pub method: String, pub params: serde_json::Value, pub id: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + pub subscriber: Option, } #[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] pub struct Response { pub jsonrpc: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -34,30 +37,35 @@ pub struct Response { pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] pub id: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Error { - pub code: i32, - pub message: String, #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, + pub subscription: Option, } #[derive(Debug, Serialize, Deserialize)] pub struct Notification { pub jsonrpc: String, - pub method: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub method: Option, #[serde(skip_serializing_if = "Option::is_none")] pub params: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub subscription: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Error { + pub code: i32, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, } impl std::fmt::Display for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}}}", - self.jsonrpc, self.method, self.params, self.id + "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}, subscribe: {:?}}}", + self.jsonrpc, self.method, self.params, self.id, self.subscriber ) } } @@ -66,8 +74,8 @@ impl std::fmt::Display for Response { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{{jsonrpc: {}, result': {:?}, error: {:?} , id: {:?}}}", - self.jsonrpc, self.result, self.error, self.id + "{{jsonrpc: {}, result': {:?}, error: {:?} , id: {:?}, subscription: {:?}}}", + self.jsonrpc, self.result, self.error, self.id, self.subscription ) } } @@ -86,8 +94,8 @@ impl std::fmt::Display for Notification { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{{jsonrpc: {}, method: {}, params: {:?}}}", - self.jsonrpc, self.method, self.params + "{{jsonrpc: {}, method: {:?}, params: {:?}, subscription: {:?}}}", + self.jsonrpc, self.method, self.params, self.subscription ) } } diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs deleted file mode 100644 index 2155295..0000000 --- a/jsonrpc/src/server.rs +++ /dev/null @@ -1,282 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use log::{debug, error, warn}; - -#[cfg(feature = "smol")] -use futures_rustls::rustls; -#[cfg(feature = "tokio")] -use tokio_rustls::rustls; - -use karyon_core::async_runtime::Executor; -use karyon_core::async_util::{TaskGroup, TaskResult}; - -use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; - -#[cfg(feature = "ws")] -use crate::codec::WsJsonCodec; - -use crate::{codec::JsonCodec, message, Error, RPCService, Result}; - -pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; -pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; -pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; -pub const INTERNAL_ERROR_MSG: &str = "Internal error"; - -fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { - let err = message::Error { - code, - message: msg.to_string(), - data: None, - }; - - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: Some(err), - result: None, - id, - } -} - -/// Represents an RPC server -pub struct Server { - listener: Listener, - task_group: TaskGroup, - services: HashMap>, -} - -impl Server { - /// Returns the local endpoint. - pub fn local_endpoint(&self) -> Result { - self.listener.local_endpoint().map_err(Error::from) - } - - /// Starts the RPC server - pub async fn start(self: Arc) -> Result<()> { - loop { - match self.listener.accept().await { - Ok(conn) => { - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") - } - } - Err(err) => { - error!("Failed to accept a new conn: {err}") - } - } - } - } - - /// Shuts down the RPC server - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - /// Handles a new connection - async fn handle_conn(self: &Arc, conn: Conn) -> Result<()> { - let endpoint = conn.peer_endpoint().expect("get peer endpoint"); - debug!("Handle a new connection {endpoint}"); - - let on_failure = |result: TaskResult>| async move { - if let TaskResult::Completed(Err(err)) = result { - error!("Connection {} dropped: {}", endpoint, err); - } else { - warn!("Connection {} dropped", endpoint); - } - }; - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let msg = conn.recv().await?; - let response = selfc.handle_request(msg).await; - let response = serde_json::to_value(response)?; - debug!("--> {response}"); - conn.send(response).await?; - } - }, - on_failure, - ); - - Ok(()) - } - - /// Handles a request - async fn handle_request(&self, msg: serde_json::Value) -> message::Response { - let rpc_msg = match serde_json::from_value::(msg) { - Ok(m) => m, - Err(_) => { - return pack_err_res(message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG, None); - } - }; - debug!("<-- {rpc_msg}"); - - let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect(); - if srvc_method.len() != 2 { - return pack_err_res( - message::INVALID_REQUEST_ERROR_CODE, - INVALID_REQUEST_ERROR_MSG, - Some(rpc_msg.id), - ); - } - - let srvc_name = srvc_method[0]; - let method_name = srvc_method[1]; - - let service = match self.services.get(srvc_name) { - Some(s) => s, - None => { - return pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(rpc_msg.id), - ); - } - }; - - let method = match service.get_method(method_name) { - Some(m) => m, - None => { - return pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(rpc_msg.id), - ); - } - }; - - let result = match method(rpc_msg.params.clone()).await { - Ok(res) => res, - Err(Error::ParseJSON(_)) => { - return pack_err_res( - message::PARSE_ERROR_CODE, - FAILED_TO_PARSE_ERROR_MSG, - Some(rpc_msg.id), - ); - } - Err(Error::InvalidParams(msg)) => { - return pack_err_res(message::INVALID_PARAMS_ERROR_CODE, msg, Some(rpc_msg.id)); - } - Err(Error::InvalidRequest(msg)) => { - return pack_err_res(message::INVALID_REQUEST_ERROR_CODE, msg, Some(rpc_msg.id)); - } - Err(Error::RPCMethodError(code, msg)) => { - return pack_err_res(code, msg, Some(rpc_msg.id)); - } - Err(_) => { - return pack_err_res( - message::INTERNAL_ERROR_CODE, - INTERNAL_ERROR_MSG, - Some(rpc_msg.id), - ); - } - }; - - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: None, - result: Some(result), - id: Some(rpc_msg.id), - } - } -} - -pub struct ServerBuilder { - endpoint: Endpoint, - tls_config: Option, - services: HashMap>, -} - -impl ServerBuilder { - pub fn service(mut self, service: impl RPCService + 'static) -> Self { - self.services.insert(service.name(), Box::new(service)); - self - } - - pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { - match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { - self.tls_config = Some(config); - Ok(self) - } - _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - } - } - - pub async fn build(self) -> Result> { - self._build(TaskGroup::new()).await - } - - pub async fn build_with_executor(self, ex: Executor) -> Result> { - self._build(TaskGroup::with_executor(ex)).await - } - - async fn _build(self, task_group: TaskGroup) -> Result> { - let listener: Listener = match self.endpoint { - Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::tls::listen( - &self.endpoint, - karyon_net::tls::ServerTlsConfig { - server_config: conf.clone(), - tcp_config: Default::default(), - }, - JsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::tcp::listen(&self.endpoint, Default::default(), JsonCodec {}) - .await?, - ), - }, - #[cfg(feature = "ws")] - Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { - Some(conf) => Box::new( - karyon_net::ws::listen( - &self.endpoint, - karyon_net::ws::ServerWsConfig { - tcp_config: Default::default(), - wss_config: Some(karyon_net::ws::ServerWssConfig { - server_config: conf.clone(), - }), - }, - WsJsonCodec {}, - ) - .await?, - ), - None => Box::new( - karyon_net::ws::listen(&self.endpoint, Default::default(), WsJsonCodec {}) - .await?, - ), - }, - #[cfg(all(feature = "unix", target_family = "unix"))] - Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( - &self.endpoint, - Default::default(), - JsonCodec {}, - )?), - - _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), - }; - - Ok(Arc::new(Server { - listener, - task_group, - services: self.services, - })) - } -} - -impl ServerBuilder {} - -impl Server { - pub fn builder(endpoint: impl ToEndpoint) -> Result { - let endpoint = endpoint.to_endpoint()?; - Ok(ServerBuilder { - endpoint, - services: HashMap::new(), - tls_config: None, - }) - } -} diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs new file mode 100644 index 0000000..1498825 --- /dev/null +++ b/jsonrpc/src/server/channel.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; + +use karyon_core::{async_runtime::lock::Mutex, util::random_32}; + +use crate::{Error, Result}; + +pub type SubscriptionID = u32; +pub type ArcChannel = Arc; + +/// Represents a new subscription +pub struct Subscription { + pub id: SubscriptionID, + parent: Arc, + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, +} + +impl Subscription { + /// Creates a new `Subscription` + fn new( + parent: Arc, + id: SubscriptionID, + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + ) -> Self { + Self { parent, id, chan } + } + + /// Sends a notification to the subscriber + pub async fn notify(&self, res: serde_json::Value) -> Result<()> { + if self.parent.subs.lock().await.contains(&self.id) { + self.chan.send((self.id, res)).await?; + Ok(()) + } else { + Err(Error::SubscriptionNotFound(self.id.to_string())) + } + } +} + +/// Represents a channel for creating/removing subscriptions +pub struct Channel { + chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + subs: Mutex>, +} + +impl Channel { + /// Creates a new `Channel` + pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel { + Arc::new(Self { + chan, + subs: Mutex::new(Vec::new()), + }) + } + + /// Creates a new subscription + pub async fn new_subscription(self: &Arc) -> Subscription { + let sub_id = random_32(); + let sub = Subscription::new(self.clone(), sub_id, self.chan.clone()); + self.subs.lock().await.push(sub_id); + sub + } + + /// Removes a subscription + pub async fn remove_subscription(self: &Arc, id: &SubscriptionID) { + let i = match self.subs.lock().await.iter().position(|i| i == id) { + Some(i) => i, + None => return, + }; + self.subs.lock().await.remove(i); + } +} diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs new file mode 100644 index 0000000..4ebab10 --- /dev/null +++ b/jsonrpc/src/server/mod.rs @@ -0,0 +1,454 @@ +pub mod channel; +pub mod pubsub_service; +pub mod service; + +use std::{collections::HashMap, sync::Arc}; + +use log::{debug, error, warn}; + +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use karyon_core::{ + async_runtime::Executor, + async_util::{select, Either, TaskGroup, TaskResult}, +}; + +use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; + +#[cfg(feature = "ws")] +use crate::codec::WsJsonCodec; + +#[cfg(feature = "ws")] +use karyon_net::ws::ServerWsConfig; + +use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result, TcpConfig}; + +use channel::{ArcChannel, Channel}; + +const CHANNEL_CAP: usize = 10; + +pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; +pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; +pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; +pub const INTERNAL_ERROR_MSG: &str = "Internal error"; + +fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { + let err = message::Error { + code, + message: msg.to_string(), + data: None, + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: Some(err), + result: None, + id, + subscription: None, + } +} + +struct NewRequest { + srvc_name: String, + method_name: String, + msg: message::Request, +} + +enum SanityCheckResult { + NewReq(NewRequest), + ErrRes(message::Response), +} + +/// Represents an RPC server +pub struct Server { + listener: Listener, + task_group: TaskGroup, + services: HashMap>, + pubsub_services: HashMap>, +} + +impl Server { + /// Returns the local endpoint. + pub fn local_endpoint(&self) -> Result { + self.listener.local_endpoint().map_err(Error::from) + } + + /// Starts the RPC server + pub async fn start(self: Arc) -> Result<()> { + loop { + match self.listener.accept().await { + Ok(conn) => { + if let Err(err) = self.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } + } + } + } + + /// Shuts down the RPC server + pub async fn shutdown(&self) { + self.task_group.cancel().await; + } + + /// Handles a new connection + async fn handle_conn(self: &Arc, conn: Conn) -> Result<()> { + let endpoint = conn.peer_endpoint().expect("get peer endpoint"); + debug!("Handle a new connection {endpoint}"); + + let on_failure = |result: TaskResult>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Connection {} dropped: {}", endpoint, err); + } else { + warn!("Connection {} dropped", endpoint); + } + }; + + let selfc = self.clone(); + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let channel = Channel::new(ch_tx); + self.task_group.spawn( + async move { + loop { + match select(conn.recv(), ch_rx.recv()).await { + Either::Left(msg) => { + // TODO spawn a task + let response = selfc.handle_request(channel.clone(), msg?).await; + debug!("--> {response}"); + conn.send(serde_json::to_value(response)?).await?; + } + Either::Right(msg) => { + let (sub_id, result) = msg?; + let response = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: None, + params: Some(result), + subscription: Some(sub_id.into()), + }; + debug!("--> {response}"); + conn.send(serde_json::to_value(response)?).await?; + } + } + } + }, + on_failure, + ); + + Ok(()) + } + + fn sanity_check(&self, request: serde_json::Value) -> SanityCheckResult { + let rpc_msg = match serde_json::from_value::(request) { + Ok(m) => m, + Err(_) => { + return SanityCheckResult::ErrRes(pack_err_res( + message::PARSE_ERROR_CODE, + FAILED_TO_PARSE_ERROR_MSG, + None, + )); + } + }; + debug!("<-- {rpc_msg}"); + + let srvc_method_str = rpc_msg.method.clone(); + let srvc_method: Vec<&str> = srvc_method_str.split('.').collect(); + if srvc_method.len() < 2 { + return SanityCheckResult::ErrRes(pack_err_res( + message::INVALID_REQUEST_ERROR_CODE, + INVALID_REQUEST_ERROR_MSG, + Some(rpc_msg.id), + )); + } + + let srvc_name = srvc_method[0].to_string(); + let method_name = srvc_method[1].to_string(); + + SanityCheckResult::NewReq(NewRequest { + srvc_name, + method_name, + msg: rpc_msg, + }) + } + + /// Handles a new request + async fn handle_request( + &self, + channel: ArcChannel, + msg: serde_json::Value, + ) -> message::Response { + let req = match self.sanity_check(msg) { + SanityCheckResult::NewReq(req) => req, + SanityCheckResult::ErrRes(res) => return res, + }; + + if req.msg.subscriber.is_some() { + match self.pubsub_services.get(&req.srvc_name) { + Some(s) => { + self.handle_pubsub_request(channel, s, &req.method_name, req.msg) + .await + } + None => pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(req.msg.id), + ), + } + } else { + match self.services.get(&req.srvc_name) { + Some(s) => self.handle_call_request(s, &req.method_name, req.msg).await, + None => pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(req.msg.id), + ), + } + } + } + + /// Handles a call request + async fn handle_call_request( + &self, + service: &Arc, + method_name: &str, + rpc_msg: message::Request, + ) -> message::Response { + let method = match service.get_method(method_name) { + Some(m) => m, + None => { + return pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(rpc_msg.id), + ); + } + }; + + let result = match method(rpc_msg.params.clone()).await { + Ok(res) => res, + Err(err) => return self.handle_error(err, rpc_msg.id), + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: None, + result: Some(result), + id: Some(rpc_msg.id), + subscription: None, + } + } + + /// Handles a pubsub request + async fn handle_pubsub_request( + &self, + channel: ArcChannel, + service: &Arc, + method_name: &str, + rpc_msg: message::Request, + ) -> message::Response { + let method = match service.get_pubsub_method(method_name) { + Some(m) => m, + None => { + return pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(rpc_msg.id), + ); + } + }; + + let result = match method(channel, rpc_msg.params.clone()).await { + Ok(res) => res, + Err(err) => return self.handle_error(err, rpc_msg.id), + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: None, + result: None, + id: Some(rpc_msg.id), + subscription: Some(result), + } + } + + fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response { + match err { + Error::ParseJSON(_) => pack_err_res( + message::PARSE_ERROR_CODE, + FAILED_TO_PARSE_ERROR_MSG, + Some(msg_id), + ), + Error::InvalidParams(msg) => { + pack_err_res(message::INVALID_PARAMS_ERROR_CODE, msg, Some(msg_id)) + } + Error::InvalidRequest(msg) => { + pack_err_res(message::INVALID_REQUEST_ERROR_CODE, msg, Some(msg_id)) + } + Error::RPCMethodError(code, msg) => pack_err_res(code, msg, Some(msg_id)), + _ => pack_err_res( + message::INTERNAL_ERROR_CODE, + INTERNAL_ERROR_MSG, + Some(msg_id), + ), + } + } +} + +/// Builder for constructing an RPC [`Server`]. +pub struct ServerBuilder { + endpoint: Endpoint, + tcp_config: TcpConfig, + tls_config: Option, + services: HashMap>, + pubsub_services: HashMap>, +} + +impl ServerBuilder { + /// Adds a new RPC service to the server. + pub fn service(mut self, service: Arc) -> Self { + self.services.insert(service.name(), service); + self + } + + /// Adds a new PubSub RPC service to the server. + pub fn pubsub_service(mut self, service: Arc) -> Self { + self.pubsub_services.insert(service.name(), service); + self + } + + /// Configure TCP settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tcp_config = TcpConfig::default(); + /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TCP protocols. + pub fn tcp_config(mut self, config: TcpConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tcp_config = config; + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Configure TLS settings for the server. + /// + /// # Example + /// + /// ```ignore + /// let tls_config = rustls::ServerConfig::new(...); + /// let server = Server::builder()?.tls_config(tls_config)?.build()?; + /// ``` + /// + /// This function will return an error if the endpoint does not support TLS protocols. + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + /// Builds the server with the configured options. + pub async fn build(self) -> Result> { + self._build(TaskGroup::new()).await + } + + /// Builds the server with the configured options and an executor. + pub async fn build_with_executor(self, ex: Executor) -> Result> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result> { + let listener: Listener = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: self.tcp_config, + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, self.tcp_config, JsonCodec {}).await?, + ), + }, + #[cfg(feature = "ws")] + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => { + let config = ServerWsConfig { + tcp_config: self.tcp_config, + wss_config: None, + }; + Box::new(karyon_net::ws::listen(&self.endpoint, config, WsJsonCodec {}).await?) + } + }, + #[cfg(all(feature = "unix", target_family = "unix"))] + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + pubsub_services: self.pubsub_services, + })) + } +} + +impl Server { + /// Creates a new [`ServerBuilder`] + /// + /// This function initializes a `ServerBuilder` with the specified endpoint. + /// + /// # Example + /// + /// ```ignore + /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + pubsub_services: HashMap::new(), + tcp_config: Default::default(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs new file mode 100644 index 0000000..5b4bf9a --- /dev/null +++ b/jsonrpc/src/server/pubsub_service.rs @@ -0,0 +1,67 @@ +use std::{future::Future, pin::Pin}; + +use crate::Result; + +use super::channel::ArcChannel; + +/// Represents the RPC method +pub type PubSubRPCMethod<'a> = + Box PubSubRPCMethodOutput<'a> + Send + 'a>; +type PubSubRPCMethodOutput<'a> = + Pin> + Send + Sync + 'a>>; + +/// Defines the interface for an RPC service. +pub trait PubSubRPCService: Sync + Send { + fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option; + fn name(&self) -> String; +} + +/// Implements the [`PubSubRPCService`] trait for a provided type. +/// +/// # Example +/// +/// ``` +/// use serde_json::Value; +/// +/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// +/// struct Hello {} +/// +/// impl Hello { +/// async fn foo(&self, params: Value) -> Result { +/// Ok(serde_json::json!("foo!")) +/// } +/// +/// async fn bar(&self, params: Value) -> Result { +/// Ok(serde_json::json!("bar!")) +/// } +/// } +/// +/// impl_rpc_service!(Hello, foo, bar); +/// +/// ``` +#[macro_export] +macro_rules! impl_pubsub_rpc_service { + ($t:ty, $($m:ident),*) => { + impl karyon_jsonrpc::PubSubRPCService for $t { + fn get_pubsub_method<'a>( + &'a self, + name: &'a str + ) -> Option { + match name { + $( + stringify!($m) => { + Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, params: serde_json::Value| Box::pin(self.$m(chan, params)))) + } + )* + _ => None, + } + + + } + fn name(&self) -> String{ + stringify!($t).to_string() + } + } + }; +} diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs new file mode 100644 index 0000000..4c8c4b8 --- /dev/null +++ b/jsonrpc/src/server/service.rs @@ -0,0 +1,64 @@ +use std::{future::Future, pin::Pin}; + +use crate::Result; + +/// Represents the RPC method +pub type RPCMethod<'a> = Box RPCMethodOutput<'a> + Send + 'a>; +type RPCMethodOutput<'a> = + Pin> + Send + Sync + 'a>>; + +/// Defines the interface for an RPC service. +pub trait RPCService: Sync + Send { + fn get_method<'a>(&'a self, name: &'a str) -> Option; + fn name(&self) -> String; +} + +/// Implements the [`RPCService`] trait for a provided type. +/// +/// # Example +/// +/// ``` +/// use serde_json::Value; +/// +/// use karyon_jsonrpc::{Error, impl_rpc_service}; +/// +/// struct Hello {} +/// +/// impl Hello { +/// async fn foo(&self, params: Value) -> Result { +/// Ok(serde_json::json!("foo!")) +/// } +/// +/// async fn bar(&self, params: Value) -> Result { +/// Ok(serde_json::json!("bar!")) +/// } +/// } +/// +/// impl_rpc_service!(Hello, foo, bar); +/// +/// ``` +#[macro_export] +macro_rules! impl_rpc_service { + ($t:ty, $($m:ident),*) => { + impl karyon_jsonrpc::RPCService for $t { + fn get_method<'a>( + &'a self, + name: &'a str + ) -> Option { + match name { + $( + stringify!($m) => { + Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) + } + )* + _ => None, + } + + + } + fn name(&self) -> String{ + stringify!($t).to_string() + } + } + }; +} diff --git a/jsonrpc/src/service.rs b/jsonrpc/src/service.rs deleted file mode 100644 index 4c8c4b8..0000000 --- a/jsonrpc/src/service.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use crate::Result; - -/// Represents the RPC method -pub type RPCMethod<'a> = Box RPCMethodOutput<'a> + Send + 'a>; -type RPCMethodOutput<'a> = - Pin> + Send + Sync + 'a>>; - -/// Defines the interface for an RPC service. -pub trait RPCService: Sync + Send { - fn get_method<'a>(&'a self, name: &'a str) -> Option; - fn name(&self) -> String; -} - -/// Implements the [`RPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{Error, impl_rpc_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// impl_rpc_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! impl_rpc_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { - fn get_method<'a>( - &'a self, - name: &'a str - ) -> Option { - match name { - $( - stringify!($m) => { - Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} -- cgit v1.2.3