From 3429caa87699d986f799a11f6e0f4526e723b655 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 14 Jun 2024 22:49:53 +0200 Subject: jsonrpc: client use unbounded channels as buffer for sending requests & clean up examples --- jsonrpc/src/server/channel.rs | 3 ++- jsonrpc/src/server/mod.rs | 62 +++++++++++++++++++++---------------------- 2 files changed, 33 insertions(+), 32 deletions(-) (limited to 'jsonrpc/src/server') diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 9278c8c..b5c9184 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -4,6 +4,7 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32}; use crate::{message::SubscriptionID, Error, Result}; +#[derive(Debug)] pub(crate) struct NewNotification { pub sub_id: SubscriptionID, pub result: serde_json::Value, @@ -20,7 +21,7 @@ pub struct Subscription { } impl Subscription { - /// Creates a new `Subscription` + /// Creates a new [`Subscription`] fn new( parent: Arc, id: SubscriptionID, diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 09850c5..86b1b31 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -16,8 +16,6 @@ use crate::{message, Error, PubSubRPCService, RPCService, Result}; use channel::Channel; use response_queue::ResponseQueue; -const CHANNEL_CAP: usize = 10; - pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; @@ -49,7 +47,7 @@ impl Server { } /// Starts the RPC server - pub async fn start(self: &Arc) { + pub fn start(self: &Arc) { let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Accept loop stopped: {err}"); @@ -87,7 +85,9 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let conn = Arc::new(conn); + + let (ch_tx, ch_rx) = async_channel::unbounded(); // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); @@ -99,28 +99,37 @@ impl Server { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } - // close the subscription channel + // Close the connection subscription channel chan.close(); }; + let conn_cloned = conn.clone(); let queue_cloned = queue.clone(); - // Start listing to new notifications coming from rpc services - // Push notifications as responses to the response queue + // Start listening for responses in the queue or new notifications self.task_group.spawn( async move { loop { - let nt = ch_rx.recv().await?; - let params = Some(serde_json::json!(message::NotificationResult { - subscription: nt.sub_id, - result: Some(nt.result), - })); - let notification = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: nt.method, - params, - }; - debug!("--> {notification}"); - queue_cloned.push(serde_json::json!(notification)).await; + // The select function will prioritize the first future if both futures are ready. + // This gives priority to the responses in the response queue. + match select(queue_cloned.recv(), ch_rx.recv()).await { + Either::Left(res) => { + conn_cloned.send(res).await?; + } + Either::Right(notification) => { + let nt = notification?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let notification = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + // debug!("--> {notification}"); + conn_cloned.send(serde_json::json!(notification)).await?; + } + } } }, on_complete, @@ -138,21 +147,12 @@ impl Server { }; let selfc = self.clone(); - // Spawn a new task and wait for either a new response in the response - // queue or a new request coming from a connected client. + // Spawn a new task and wait for requests. self.task_group.spawn( async move { loop { - match select(conn.recv(), queue.recv()).await { - Either::Left(msg) => { - selfc - .new_request(queue.clone(), channel.clone(), msg?) - .await; - } - Either::Right(res) => { - conn.send(res).await?; - } - } + let msg = conn.recv().await?; + selfc.new_request(queue.clone(), channel.clone(), msg).await; } }, on_complete, -- cgit v1.2.3