diff options
Diffstat (limited to 'jsonrpc/src')
-rw-r--r-- | jsonrpc/src/client/mod.rs | 7 | ||||
-rw-r--r-- | jsonrpc/src/server/channel.rs | 10 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 75 |
3 files changed, 75 insertions, 17 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 326252a..3a4505c 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -149,7 +149,12 @@ impl Client { async move { loop { let msg = selfc.conn.recv().await?; - selfc.handle_msg(msg).await?; + if let Err(err) = selfc.handle_msg(msg).await { + error!( + "Failed to handle a new received msg from the connection {} : {err}", + selfc.conn.peer_endpoint()? + ); + } } }, on_failure, diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index efcd344..a9d1002 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -59,7 +59,7 @@ pub struct Channel { } impl Channel { - /// Creates a new `Channel` + /// Creates a new [`Channel`] pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel { Arc::new(Self { chan, @@ -67,7 +67,7 @@ impl Channel { }) } - /// Creates a new subscription + /// Creates a new [`Subscription`] pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription { let sub_id = random_32(); let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method); @@ -76,7 +76,7 @@ impl Channel { } /// Removes a subscription - pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) { + pub async fn remove_subscription(&self, id: &SubscriptionID) { let mut subs = self.subs.lock().await; let i = match subs.iter().position(|i| i == id) { Some(i) => i, @@ -84,4 +84,8 @@ impl Channel { }; subs.remove(i); } + + pub fn close(&self) { + self.chan.close(); + } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7f28de2..7136be4 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -3,11 +3,17 @@ pub mod channel; pub mod pubsub_service; pub mod service; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ + async_runtime::lock::Mutex, + async_util::{select, CondVar, Either, TaskGroup, TaskResult}, +}; use karyon_net::{Conn, Endpoint, Listener}; use crate::{message, Error, PubSubRPCService, RPCService, Result}; @@ -47,6 +53,42 @@ enum SanityCheckResult { ErrRes(message::Response), } +struct ResponseQueue<T> { + queue: Mutex<VecDeque<T>>, + condvar: CondVar, +} + +impl<T> ResponseQueue<T> { + fn new() -> Arc<Self> { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.broadcast(); + } +} + /// Represents an RPC server pub struct Server { listener: Listener<serde_json::Value>, @@ -99,20 +141,21 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); - // TODO Avoid depending on channels - let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); let channel = Channel::new(ch_tx); + let queue = ResponseQueue::new(); + + let chan = channel.clone(); let on_failure = |result: TaskResult<Result<()>>| async move { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } + // close the subscription channel + chan.close(); }; - let selfc = self.clone(); - let txc = tx.clone(); + let queue_cloned = queue.clone(); self.task_group.spawn( async move { loop { @@ -127,28 +170,34 @@ impl Server { params, }; debug!("--> {response}"); - txc.send(serde_json::to_value(response)?).await?; + queue_cloned.push(serde_json::json!(response)).await; } }, on_failure, ); + let chan = channel.clone(); let on_failure = |result: TaskResult<Result<()>>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); } else { warn!("Connection {} dropped", endpoint); } + // close the subscription channel when the connection dropped + chan.close(); }; + let selfc = self.clone(); self.task_group.spawn( async move { loop { - match select(conn.recv(), rx.recv()).await { + match select(conn.recv(), queue.recv()).await { Either::Left(msg) => { - selfc.new_request(tx.clone(), channel.clone(), msg?).await; + selfc + .new_request(queue.clone(), channel.clone(), msg?) + .await; } - Either::Right(msg) => conn.send(msg?).await?, + Either::Right(res) => conn.send(res).await?, } } }, @@ -194,7 +243,7 @@ impl Server { /// Spawns a new task for handling a new request async fn new_request( self: &Arc<Self>, - sender: async_channel::Sender<serde_json::Value>, + queue: Arc<ResponseQueue<serde_json::Value>>, channel: ArcChannel, msg: serde_json::Value, ) { @@ -209,7 +258,7 @@ impl Server { async move { let response = selfc.handle_request(channel, msg).await; debug!("--> {response}"); - sender.send(serde_json::json!(response)).await?; + queue.push(serde_json::json!(response)).await; Ok(()) }, on_failure, |