From 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 13 Jun 2024 05:52:48 +0200 Subject: jsonrpc: spread out comments and clean up --- jsonrpc/README.md | 19 ++++- jsonrpc/examples/pubsub_server.rs | 8 +- jsonrpc/examples/tokio_server/Cargo.lock | 8 +- jsonrpc/examples/tokio_server/src/main.rs | 12 ++- jsonrpc/src/client/builder.rs | 10 +-- jsonrpc/src/client/message_dispatcher.rs | 54 ++++++++++++ jsonrpc/src/client/mod.rs | 107 +++++++++--------------- jsonrpc/src/client/subscriber.rs | 64 +++++++++++++++ jsonrpc/src/lib.rs | 2 +- jsonrpc/src/server/channel.rs | 8 +- jsonrpc/src/server/mod.rs | 131 ++++++++++++------------------ jsonrpc/src/server/pubsub_service.rs | 9 +- jsonrpc/src/server/response_queue.rs | 40 +++++++++ 13 files changed, 293 insertions(+), 179 deletions(-) create mode 100644 jsonrpc/src/client/message_dispatcher.rs create mode 100644 jsonrpc/src/client/subscriber.rs create mode 100644 jsonrpc/src/server/response_queue.rs (limited to 'jsonrpc') diff --git a/jsonrpc/README.md b/jsonrpc/README.md index 5fc6847..b4ee580 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -13,6 +13,15 @@ features: - Allows passing an `async_executors::Executor` or tokio's `Runtime` when building the server. + +## Install + +```bash + +$ cargo add karyon_jsonrpc + +``` + ## Example ```rust @@ -23,7 +32,7 @@ use smol::stream::StreamExt; use karyon_jsonrpc::{ Error, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID, - ArcChannel + Channel }; struct HelloWorld {} @@ -46,7 +55,7 @@ impl HelloWorld { #[rpc_pubsub_impl] impl HelloWorld { - async fn log_subscribe(&self, chan: ArcChannel, method: String, _params: Value) -> Result { + async fn log_subscribe(&self, chan: Arc, method: String, _params: Value) -> Result { let sub = chan.new_subscription(&method).await; let sub_id = sub.id.clone(); smol::spawn(async move { @@ -63,7 +72,7 @@ impl HelloWorld { Ok(serde_json::json!(sub_id)) } - async fn log_unsubscribe(&self, chan: ArcChannel, method: String, params: Value) -> Result { + async fn log_unsubscribe(&self, chan: Arc, method: String, params: Value) -> Result { let sub_id: SubscriptionID = serde_json::from_value(params)?; chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) @@ -126,6 +135,10 @@ async { ``` +## Supported Client Implementations +- [X] [Golang](https://github.com/karyontech/karyon-go) +- [ ] Python +- [ ] JavaScript/TypeScript diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs index bae5b37..74eb907 100644 --- a/jsonrpc/examples/pubsub_server.rs +++ b/jsonrpc/examples/pubsub_server.rs @@ -4,9 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use karyon_core::async_util::sleep; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, -}; +use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server}; struct Calc {} @@ -30,7 +28,7 @@ impl Calc { impl Calc { async fn log_subscribe( &self, - chan: ArcChannel, + chan: Arc, method: String, _params: Value, ) -> Result { @@ -52,7 +50,7 @@ impl Calc { async fn log_unsubscribe( &self, - chan: ArcChannel, + chan: Arc, _method: String, params: Value, ) -> Result { diff --git a/jsonrpc/examples/tokio_server/Cargo.lock b/jsonrpc/examples/tokio_server/Cargo.lock index 16926fc..bd7d714 100644 --- a/jsonrpc/examples/tokio_server/Cargo.lock +++ b/jsonrpc/examples/tokio_server/Cargo.lock @@ -663,7 +663,7 @@ dependencies = [ [[package]] name = "karyon_core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "bincode", @@ -680,7 +680,7 @@ dependencies = [ [[package]] name = "karyon_jsonrpc" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -698,7 +698,7 @@ dependencies = [ [[package]] name = "karyon_jsonrpc_macro" -version = "0.1.0" +version = "0.1.1" dependencies = [ "proc-macro2", "quote", @@ -708,7 +708,7 @@ dependencies = [ [[package]] name = "karyon_net" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 3bb4871..0a47fda 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, -}; +use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server}; struct Calc { version: String, @@ -45,16 +43,16 @@ impl Calc { impl Calc { async fn log_subscribe( &self, - chan: ArcChannel, + chan: Arc, method: String, _params: Value, ) -> Result { let sub = chan.new_subscription(&method).await; - let sub_id = sub.id.clone(); + let sub_id = sub.id; tokio::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; - if let Err(_) = sub.notify(serde_json::json!("Hello")).await { + if sub.notify(serde_json::json!("Hello")).await.is_err() { break; } } @@ -65,7 +63,7 @@ impl Calc { async fn log_unsubscribe( &self, - chan: ArcChannel, + chan: Arc, _method: String, params: Value, ) -> Result { diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs index a287070..2263498 100644 --- a/jsonrpc/src/client/builder.rs +++ b/jsonrpc/src/client/builder.rs @@ -1,11 +1,11 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; #[cfg(feature = "smol")] use futures_rustls::rustls; #[cfg(feature = "tokio")] use tokio_rustls::rustls; -use karyon_core::{async_runtime::lock::Mutex, async_util::TaskGroup}; +use karyon_core::async_util::TaskGroup; use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; #[cfg(feature = "ws")] @@ -16,7 +16,7 @@ use crate::codec::WsJsonCodec; use crate::{codec::JsonCodec, Error, Result, TcpConfig}; -use super::Client; +use super::{Client, MessageDispatcher, Subscriber}; const DEFAULT_TIMEOUT: u64 = 3000; // 3s @@ -171,8 +171,8 @@ impl ClientBuilder { let client = Arc::new(Client { timeout: self.timeout, conn, - chans: Mutex::new(HashMap::new()), - subscriptions: Mutex::new(HashMap::new()), + message_dispatcher: MessageDispatcher::new(), + subscriber: Subscriber::new(), task_group: TaskGroup::new(), }); client.start_background_receiving(); diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs new file mode 100644 index 0000000..a803f6e --- /dev/null +++ b/jsonrpc/src/client/message_dispatcher.rs @@ -0,0 +1,54 @@ +use std::collections::HashMap; + +use async_channel::{Receiver, Sender}; + +use karyon_core::async_runtime::lock::Mutex; + +use crate::{message, Error, Result}; + +use super::RequestID; + +const CHANNEL_CAP: usize = 10; + +/// Manages client requests +pub(super) struct MessageDispatcher { + chans: Mutex>>, +} + +impl MessageDispatcher { + /// Creates a new MessageDispatcher + pub(super) fn new() -> Self { + Self { + chans: Mutex::new(HashMap::new()), + } + } + + /// Registers a new request with a given ID and returns a Receiver channel + /// to wait for the response. + pub(super) async fn register(&self, id: RequestID) -> Receiver { + let (tx, rx) = async_channel::bounded(CHANNEL_CAP); + self.chans.lock().await.insert(id, tx); + rx + } + + /// Unregisters the request with the provided ID + pub(super) async fn unregister(&self, id: &RequestID) { + self.chans.lock().await.remove(id); + } + + /// Dispatches a response to the channel associated with the response's ID. + /// + /// If a channel is registered for the response's ID, the response is sent + /// through that channel. If no channel is found for the ID, returns an error. + pub(super) async fn dispatch(&self, res: message::Response) -> Result<()> { + if res.id.is_none() { + return Err(Error::InvalidMsg("Response id is none")); + } + let id: RequestID = serde_json::from_value(res.id.clone().unwrap())?; + let val = self.chans.lock().await.remove(&id); + match val { + Some(tx) => tx.send(res).await.map_err(Error::from), + None => Err(Error::InvalidMsg("Receive unknown message")), + } + } +} diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 3a4505c..95354d3 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -1,13 +1,13 @@ pub mod builder; +mod message_dispatcher; +mod subscriber; -use std::{collections::HashMap, sync::Arc, time::Duration}; - -use log::{debug, error, warn}; +use log::{debug, error}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::json; +use std::{sync::Arc, time::Duration}; use karyon_core::{ - async_runtime::lock::Mutex, async_util::{timeout, TaskGroup, TaskResult}, util::random_32, }; @@ -18,21 +18,19 @@ use crate::{ Error, Result, }; -const CHANNEL_CAP: usize = 10; +use message_dispatcher::MessageDispatcher; +use subscriber::Subscriber; +pub use subscriber::Subscription; -/// Type alias for a subscription to receive notifications. -/// -/// The receiver channel is returned by the `subscribe` method to receive -/// notifications from the server. -pub type Subscription = async_channel::Receiver; +type RequestID = u32; /// Represents an RPC client pub struct Client { conn: Conn, timeout: Option, - chans: Mutex>>, - subscriptions: Mutex>>, + message_dispatcher: MessageDispatcher, task_group: TaskGroup, + subscriber: Subscriber, } impl Client { @@ -67,10 +65,9 @@ impl Client { None => return Err(Error::InvalidMsg("Invalid subscription id")), }; - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); - self.subscriptions.lock().await.insert(sub_id, ch_tx); + let rx = self.subscriber.subscribe(sub_id).await; - Ok((sub_id, ch_rx)) + Ok((sub_id, rx)) } /// Unsubscribes from the provided method, waits for the response, and returns the result. @@ -79,7 +76,7 @@ impl Client { /// and subscription ID. It waits for the response to confirm the unsubscription. pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { let _ = self.send_request(method, sub_id).await?; - self.subscriptions.lock().await.remove(&sub_id); + self.subscriber.unsubscribe(&sub_id).await; Ok(()) } @@ -88,7 +85,7 @@ impl Client { method: &str, params: T, ) -> Result { - let id = random_32(); + let id: RequestID = random_32(); let request = message::Request { jsonrpc: message::JSONRPC_VERSION.to_string(), id: json!(id), @@ -98,16 +95,24 @@ impl Client { let req_json = serde_json::to_value(&request)?; + // Send the json request self.conn.send(req_json).await?; - let (tx, rx) = async_channel::bounded(CHANNEL_CAP); - self.chans.lock().await.insert(id, tx); + // Register a new request + let rx = self.message_dispatcher.register(id).await; + + // Wait for the message dispatcher to send the response + let result = match self.timeout { + Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?, + None => rx.recv().await, + }; - let response = match self.wait_for_response(rx).await { + let response = match result { Ok(r) => r, Err(err) => { - self.chans.lock().await.remove(&id); - return Err(err); + // Unregister the request if an error occurs + self.message_dispatcher.unregister(&id).await; + return Err(err.into()); } }; @@ -115,6 +120,8 @@ impl Client { return Err(Error::SubscribeError(error.code, error.message)); } + // It should be OK to unwrap here, as the message dispatcher checks + // for the response id. if *response.id.as_ref().unwrap() != request.id { return Err(Error::InvalidMsg("Invalid response id")); } @@ -123,28 +130,17 @@ impl Client { Ok(response) } - async fn wait_for_response( - &self, - rx: async_channel::Receiver, - ) -> Result { - match self.timeout { - Some(t) => timeout(Duration::from_millis(t), rx.recv()) - .await? - .map_err(Error::from), - None => rx.recv().await.map_err(Error::from), - } - } - fn start_background_receiving(self: &Arc) { let selfc = self.clone(); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("background receiving stopped: {err}"); } - // drop all subscription channels - selfc.subscriptions.lock().await.clear(); + // Drop all subscription + selfc.subscriber.drop_all().await; }; let selfc = self.clone(); + // Spawn a new task for listing to new coming messages. self.task_group.spawn( async move { loop { @@ -157,48 +153,23 @@ impl Client { } } }, - on_failure, + on_complete, ); } async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> { + // Check if the received message is of type Response if let Ok(res) = serde_json::from_value::(msg.clone()) { debug!("<-- {res}"); - if res.id.is_none() { - return Err(Error::InvalidMsg("Response id is none")); - } - - let id: u32 = serde_json::from_value(res.id.clone().unwrap())?; - match self.chans.lock().await.remove(&id) { - Some(tx) => tx.send(res).await?, - None => return Err(Error::InvalidMsg("Receive unkown message")), - } - + self.message_dispatcher.dispatch(res).await?; return Ok(()); } + // Check if the received message is of type Notification if let Ok(nt) = serde_json::from_value::(msg.clone()) { debug!("<-- {nt}"); - let sub_result: message::NotificationResult = match nt.params { - Some(ref p) => serde_json::from_value(p.clone())?, - None => return Err(Error::InvalidMsg("Invalid notification msg")), - }; - - match self - .subscriptions - .lock() - .await - .get(&sub_result.subscription) - { - Some(s) => { - s.send(sub_result.result.unwrap_or(json!(""))).await?; - return Ok(()); - } - None => { - warn!("Receive unknown notification {}", sub_result.subscription); - return Ok(()); - } - } + self.subscriber.notify(nt).await?; + return Ok(()); } error!("Receive unexpected msg: {msg}"); diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs new file mode 100644 index 0000000..d47cc2a --- /dev/null +++ b/jsonrpc/src/client/subscriber.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use async_channel::{Receiver, Sender}; +use log::warn; +use serde_json::json; + +use karyon_core::async_runtime::lock::Mutex; + +use crate::{ + message::{Notification, NotificationResult, SubscriptionID}, + Error, Result, +}; + +const CHANNEL_CAP: usize = 10; + +/// Manages subscriptions for the client. +pub(super) struct Subscriber { + subs: Mutex>>, +} + +/// Type alias for a subscription to receive notifications. +/// +/// The receiver channel is returned by the `subscribe` +pub type Subscription = Receiver; + +impl Subscriber { + pub(super) fn new() -> Self { + Self { + subs: Mutex::new(HashMap::new()), + } + } + + pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver { + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + self.subs.lock().await.insert(id, ch_tx); + ch_rx + } + + pub(super) async fn drop_all(&self) { + self.subs.lock().await.clear(); + } + + /// Unsubscribe + pub(super) async fn unsubscribe(&self, id: &SubscriptionID) { + self.subs.lock().await.remove(id); + } + + pub(super) async fn notify(&self, nt: Notification) -> Result<()> { + let nt_res: NotificationResult = match nt.params { + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), + }; + match self.subs.lock().await.get(&nt_res.subscription) { + Some(s) => { + s.send(nt_res.result.unwrap_or(json!(""))).await?; + Ok(()) + } + None => { + warn!("Receive unknown notification {}", nt_res.subscription); + Ok(()) + } + } + } +} diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index b7b632e..23a6e08 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -10,7 +10,7 @@ pub use client::{builder::ClientBuilder, Client}; pub use error::{Error, Result}; pub use server::{ builder::ServerBuilder, - channel::{ArcChannel, Channel, Subscription}, + channel::{Channel, Subscription}, pubsub_service::{PubSubRPCMethod, PubSubRPCService}, service::{RPCMethod, RPCService}, Server, diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index a9d1002..bb62b9f 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -4,8 +4,6 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32}; use crate::{message::SubscriptionID, Error, Result}; -pub type ArcChannel = Arc; - pub(crate) struct NewNotification { pub sub_id: SubscriptionID, pub result: serde_json::Value, @@ -52,7 +50,7 @@ impl Subscription { } } -/// Represents a channel for creating/removing subscriptions +/// Represents a connection channel for creating/removing subscriptions pub struct Channel { chan: async_channel::Sender, subs: Mutex>, @@ -60,7 +58,7 @@ pub struct Channel { impl Channel { /// Creates a new [`Channel`] - pub(crate) fn new(chan: async_channel::Sender) -> ArcChannel { + pub(crate) fn new(chan: async_channel::Sender) -> Arc { Arc::new(Self { chan, subs: Mutex::new(Vec::new()), @@ -75,7 +73,7 @@ impl Channel { sub } - /// Removes a subscription + /// Removes a [`Subscription`] pub async fn remove_subscription(&self, id: &SubscriptionID) { let mut subs = self.subs.lock().await; let i = match subs.iter().position(|i| i == id) { diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7136be4..09850c5 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -1,24 +1,20 @@ pub mod builder; pub mod channel; pub mod pubsub_service; +mod response_queue; pub mod service; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -use karyon_core::{ - async_runtime::lock::Mutex, - async_util::{select, CondVar, Either, TaskGroup, TaskResult}, -}; +use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; use karyon_net::{Conn, Endpoint, Listener}; use crate::{message, Error, PubSubRPCService, RPCService, Result}; -use channel::{ArcChannel, Channel}; +use channel::Channel; +use response_queue::ResponseQueue; const CHANNEL_CAP: usize = 10; @@ -27,21 +23,6 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; pub const INTERNAL_ERROR_MSG: &str = "Internal error"; -fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { - let err = message::Error { - code, - message: msg.to_string(), - data: None, - }; - - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: Some(err), - result: None, - id, - } -} - struct NewRequest { srvc_name: String, method_name: String, @@ -53,42 +34,6 @@ enum SanityCheckResult { ErrRes(message::Response), } -struct ResponseQueue { - queue: Mutex>, - condvar: CondVar, -} - -impl ResponseQueue { - fn new() -> Arc { - Arc::new(Self { - queue: Mutex::new(VecDeque::new()), - condvar: CondVar::new(), - }) - } - - /// Wait while the queue is empty, remove and return the item from the queue, - /// panicking if empty (shouldn't happen) - async fn recv(&self) -> T { - let mut queue = self.queue.lock().await; - - while queue.is_empty() { - queue = self.condvar.wait(queue).await; - } - - match queue.pop_front() { - Some(v) => v, - None => unreachable!(), - } - } - - /// Push an item into the queue, notify all waiting tasks that the - /// condvar has changed - async fn push(&self, res: T) { - self.queue.lock().await.push_back(res); - self.condvar.broadcast(); - } -} - /// Represents an RPC server pub struct Server { listener: Listener, @@ -105,13 +50,14 @@ impl Server { /// Starts the RPC server pub async fn start(self: &Arc) { - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Accept loop stopped: {err}"); } }; let selfc = self.clone(); + // Spawns a new task for each new incoming connection self.task_group.spawn( async move { loop { @@ -127,7 +73,7 @@ impl Server { } } }, - on_failure, + on_complete, ); } @@ -142,12 +88,14 @@ impl Server { debug!("Handle a new connection {endpoint}"); let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); + // Create a response queue let queue = ResponseQueue::new(); let chan = channel.clone(); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } @@ -156,6 +104,8 @@ impl Server { }; let queue_cloned = queue.clone(); + // Start listing to new notifications coming from rpc services + // Push notifications as responses to the response queue self.task_group.spawn( async move { loop { @@ -164,30 +114,32 @@ impl Server { subscription: nt.sub_id, result: Some(nt.result), })); - let response = message::Notification { + let notification = message::Notification { jsonrpc: message::JSONRPC_VERSION.to_string(), method: nt.method, params, }; - debug!("--> {response}"); - queue_cloned.push(serde_json::json!(response)).await; + debug!("--> {notification}"); + queue_cloned.push(serde_json::json!(notification)).await; } }, - on_failure, + on_complete, ); let chan = channel.clone(); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); } else { warn!("Connection {} dropped", endpoint); } - // close the subscription channel when the connection dropped + // Close the subscription channel when the connection dropped chan.close(); }; let selfc = self.clone(); + // Spawn a new task and wait for either a new response in the response + // queue or a new request coming from a connected client. self.task_group.spawn( async move { loop { @@ -197,11 +149,13 @@ impl Server { .new_request(queue.clone(), channel.clone(), msg?) .await; } - Either::Right(res) => conn.send(res).await?, + Either::Right(res) => { + conn.send(res).await?; + } } } }, - on_failure, + on_complete, ); Ok(()) @@ -220,6 +174,7 @@ impl Server { }; debug!("<-- {rpc_msg}"); + // Parse the service name and its method let srvc_method_str = rpc_msg.method.clone(); let srvc_method: Vec<&str> = srvc_method_str.split('.').collect(); if srvc_method.len() < 2 { @@ -240,20 +195,22 @@ impl Server { }) } - /// Spawns a new task for handling a new request + /// Spawns a new task for handling the new request async fn new_request( self: &Arc, queue: Arc>, - channel: ArcChannel, + channel: Arc, msg: serde_json::Value, ) { trace!("--> new request {msg}"); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Failed to handle a request: {err}"); } }; let selfc = self.clone(); + // Spawns a new task for handling the new request, and push the + // response to the response queue. self.task_group.spawn( async move { let response = selfc.handle_request(channel, msg).await; @@ -261,14 +218,15 @@ impl Server { queue.push(serde_json::json!(response)).await; Ok(()) }, - on_failure, + on_complete, ); } - /// Handles a new request + /// Handles the new request, and returns an RPC Response that has either + /// an error or result async fn handle_request( &self, - channel: ArcChannel, + channel: Arc, msg: serde_json::Value, ) -> message::Response { let req = match self.sanity_check(msg) { @@ -283,7 +241,9 @@ impl Server { id: Some(req.msg.id.clone()), }; + // Check if the service exists in pubsub services list if let Some(service) = self.pubsub_services.get(&req.srvc_name) { + // Check if the method exists within the service if let Some(method) = service.get_pubsub_method(&req.method_name) { let name = format!("{}.{}", service.name(), req.method_name); let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -296,7 +256,9 @@ impl Server { } } + // Check if the service exists in services list if let Some(service) = self.services.get(&req.srvc_name) { + // Check if the method exists within the service if let Some(method) = service.get_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); response.result = match method(params).await { @@ -337,3 +299,18 @@ impl Server { } } } + +fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { + let err = message::Error { + code, + message: msg.to_string(), + data: None, + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: Some(err), + result: None, + id, + } +} diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index 5b3b50b..08d1bbb 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -1,12 +1,12 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, sync::Arc}; use crate::Result; -use super::channel::ArcChannel; +use super::channel::Channel; /// Represents the RPC method pub type PubSubRPCMethod<'a> = - Box PubSubRPCMethodOutput<'a> + Send + 'a>; + Box, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>; type PubSubRPCMethodOutput<'a> = Pin> + Send + Sync + 'a>>; @@ -51,7 +51,8 @@ macro_rules! impl_pubsub_rpc_service { match name { $( stringify!($m) => { - Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| { + Some(Box::new( + move |chan: std::sync::Arc, method: String, params: serde_json::Value| { Box::pin(self.$m(chan, method, params)) })) } diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs new file mode 100644 index 0000000..0d70503 --- /dev/null +++ b/jsonrpc/src/server/response_queue.rs @@ -0,0 +1,40 @@ +use std::{collections::VecDeque, sync::Arc}; + +use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; + +/// A queue for handling responses +pub(super) struct ResponseQueue { + queue: Mutex>, + condvar: CondVar, +} + +impl ResponseQueue { + pub(super) fn new() -> Arc { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + pub(super) async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + pub(super) async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.signal(); + } +} -- cgit v1.2.3