aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-30 02:07:50 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-30 02:07:50 +0200
commita06239ccc5e21fd20182ec3046cf9174ecc58a43 (patch)
tree12bf68067ba377f8bb081e98c3e3a4ac4fcd13b7 /jsonrpc/src/server
parent34f528a3b4063b4a25915e60d7f22ee1fb2d1bd9 (diff)
jsonrpc/server: use queue with condvar instead of async channels
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r--jsonrpc/src/server/channel.rs10
-rw-r--r--jsonrpc/src/server/mod.rs75
2 files changed, 69 insertions, 16 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index efcd344..a9d1002 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -59,7 +59,7 @@ pub struct Channel {
}
impl Channel {
- /// Creates a new `Channel`
+ /// Creates a new [`Channel`]
pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
Arc::new(Self {
chan,
@@ -67,7 +67,7 @@ impl Channel {
})
}
- /// Creates a new subscription
+ /// Creates a new [`Subscription`]
pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {
let sub_id = random_32();
let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method);
@@ -76,7 +76,7 @@ impl Channel {
}
/// Removes a subscription
- pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) {
+ pub async fn remove_subscription(&self, id: &SubscriptionID) {
let mut subs = self.subs.lock().await;
let i = match subs.iter().position(|i| i == id) {
Some(i) => i,
@@ -84,4 +84,8 @@ impl Channel {
};
subs.remove(i);
}
+
+ pub fn close(&self) {
+ self.chan.close();
+ }
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7f28de2..7136be4 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -3,11 +3,17 @@ pub mod channel;
pub mod pubsub_service;
pub mod service;
-use std::{collections::HashMap, sync::Arc};
+use std::{
+ collections::{HashMap, VecDeque},
+ sync::Arc,
+};
use log::{debug, error, trace, warn};
-use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
+use karyon_core::{
+ async_runtime::lock::Mutex,
+ async_util::{select, CondVar, Either, TaskGroup, TaskResult},
+};
use karyon_net::{Conn, Endpoint, Listener};
use crate::{message, Error, PubSubRPCService, RPCService, Result};
@@ -47,6 +53,42 @@ enum SanityCheckResult {
ErrRes(message::Response),
}
+struct ResponseQueue<T> {
+ queue: Mutex<VecDeque<T>>,
+ condvar: CondVar,
+}
+
+impl<T> ResponseQueue<T> {
+ fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(VecDeque::new()),
+ condvar: CondVar::new(),
+ })
+ }
+
+ /// Wait while the queue is empty, remove and return the item from the queue,
+ /// panicking if empty (shouldn't happen)
+ async fn recv(&self) -> T {
+ let mut queue = self.queue.lock().await;
+
+ while queue.is_empty() {
+ queue = self.condvar.wait(queue).await;
+ }
+
+ match queue.pop_front() {
+ Some(v) => v,
+ None => unreachable!(),
+ }
+ }
+
+ /// Push an item into the queue, notify all waiting tasks that the
+ /// condvar has changed
+ async fn push(&self, res: T) {
+ self.queue.lock().await.push_back(res);
+ self.condvar.broadcast();
+ }
+}
+
/// Represents an RPC server
pub struct Server {
listener: Listener<serde_json::Value>,
@@ -99,20 +141,21 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
- // TODO Avoid depending on channels
- let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP);
-
let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
let channel = Channel::new(ch_tx);
+ let queue = ResponseQueue::new();
+
+ let chan = channel.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
+ // close the subscription channel
+ chan.close();
};
- let selfc = self.clone();
- let txc = tx.clone();
+ let queue_cloned = queue.clone();
self.task_group.spawn(
async move {
loop {
@@ -127,28 +170,34 @@ impl Server {
params,
};
debug!("--> {response}");
- txc.send(serde_json::to_value(response)?).await?;
+ queue_cloned.push(serde_json::json!(response)).await;
}
},
on_failure,
);
+ let chan = channel.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
} else {
warn!("Connection {} dropped", endpoint);
}
+ // close the subscription channel when the connection dropped
+ chan.close();
};
+ let selfc = self.clone();
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), rx.recv()).await {
+ match select(conn.recv(), queue.recv()).await {
Either::Left(msg) => {
- selfc.new_request(tx.clone(), channel.clone(), msg?).await;
+ selfc
+ .new_request(queue.clone(), channel.clone(), msg?)
+ .await;
}
- Either::Right(msg) => conn.send(msg?).await?,
+ Either::Right(res) => conn.send(res).await?,
}
}
},
@@ -194,7 +243,7 @@ impl Server {
/// Spawns a new task for handling a new request
async fn new_request(
self: &Arc<Self>,
- sender: async_channel::Sender<serde_json::Value>,
+ queue: Arc<ResponseQueue<serde_json::Value>>,
channel: ArcChannel,
msg: serde_json::Value,
) {
@@ -209,7 +258,7 @@ impl Server {
async move {
let response = selfc.handle_request(channel, msg).await;
debug!("--> {response}");
- sender.send(serde_json::json!(response)).await?;
+ queue.push(serde_json::json!(response)).await;
Ok(())
},
on_failure,