aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-14 22:49:53 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-14 22:49:53 +0200
commit3429caa87699d986f799a11f6e0f4526e723b655 (patch)
treee548c356fca4eb76742e29d4bab05007468ed14b /jsonrpc/src
parent0c0699c0460c1b149915729223eec701bde481df (diff)
jsonrpc: client use unbounded channels as buffer for sending requests & clean up examples
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/client/message_dispatcher.rs4
-rw-r--r--jsonrpc/src/client/mod.rs2
-rw-r--r--jsonrpc/src/client/subscriber.rs4
-rw-r--r--jsonrpc/src/server/channel.rs3
-rw-r--r--jsonrpc/src/server/mod.rs62
5 files changed, 36 insertions, 39 deletions
diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs
index a803f6e..aa47cec 100644
--- a/jsonrpc/src/client/message_dispatcher.rs
+++ b/jsonrpc/src/client/message_dispatcher.rs
@@ -8,8 +8,6 @@ use crate::{message, Error, Result};
use super::RequestID;
-const CHANNEL_CAP: usize = 10;
-
/// Manages client requests
pub(super) struct MessageDispatcher {
chans: Mutex<HashMap<RequestID, Sender<message::Response>>>,
@@ -26,7 +24,7 @@ impl MessageDispatcher {
/// Registers a new request with a given ID and returns a Receiver channel
/// to wait for the response.
pub(super) async fn register(&self, id: RequestID) -> Receiver<message::Response> {
- let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
+ let (tx, rx) = async_channel::unbounded();
self.chans.lock().await.insert(id, tx);
rx
}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 1225f13..9c07509 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -147,7 +147,7 @@ impl Client {
let msg = selfc.conn.recv().await?;
if let Err(err) = selfc.handle_msg(msg).await {
error!(
- "Failed to handle a new received msg from the connection {} : {err}",
+ "Handle a msg from the endpoint {} : {err}",
selfc.conn.peer_endpoint()?
);
}
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
index d47cc2a..168f16e 100644
--- a/jsonrpc/src/client/subscriber.rs
+++ b/jsonrpc/src/client/subscriber.rs
@@ -11,8 +11,6 @@ use crate::{
Error, Result,
};
-const CHANNEL_CAP: usize = 10;
-
/// Manages subscriptions for the client.
pub(super) struct Subscriber {
subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
@@ -31,7 +29,7 @@ impl Subscriber {
}
pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let (ch_tx, ch_rx) = async_channel::unbounded();
self.subs.lock().await.insert(id, ch_tx);
ch_rx
}
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 9278c8c..b5c9184 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -4,6 +4,7 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32};
use crate::{message::SubscriptionID, Error, Result};
+#[derive(Debug)]
pub(crate) struct NewNotification {
pub sub_id: SubscriptionID,
pub result: serde_json::Value,
@@ -20,7 +21,7 @@ pub struct Subscription {
}
impl Subscription {
- /// Creates a new `Subscription`
+ /// Creates a new [`Subscription`]
fn new(
parent: Arc<Channel>,
id: SubscriptionID,
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 09850c5..86b1b31 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -16,8 +16,6 @@ use crate::{message, Error, PubSubRPCService, RPCService, Result};
use channel::Channel;
use response_queue::ResponseQueue;
-const CHANNEL_CAP: usize = 10;
-
pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request";
pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
@@ -49,7 +47,7 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: &Arc<Self>) {
+ pub fn start(self: &Arc<Self>) {
let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Accept loop stopped: {err}");
@@ -87,7 +85,9 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let conn = Arc::new(conn);
+
+ let (ch_tx, ch_rx) = async_channel::unbounded();
// Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
@@ -99,28 +99,37 @@ impl Server {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
- // close the subscription channel
+ // Close the connection subscription channel
chan.close();
};
+ let conn_cloned = conn.clone();
let queue_cloned = queue.clone();
- // Start listing to new notifications coming from rpc services
- // Push notifications as responses to the response queue
+ // Start listening for responses in the queue or new notifications
self.task_group.spawn(
async move {
loop {
- let nt = ch_rx.recv().await?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- queue_cloned.push(serde_json::json!(notification)).await;
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue_cloned.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn_cloned.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ // debug!("--> {notification}");
+ conn_cloned.send(serde_json::json!(notification)).await?;
+ }
+ }
}
},
on_complete,
@@ -138,21 +147,12 @@ impl Server {
};
let selfc = self.clone();
- // Spawn a new task and wait for either a new response in the response
- // queue or a new request coming from a connected client.
+ // Spawn a new task and wait for requests.
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), queue.recv()).await {
- Either::Left(msg) => {
- selfc
- .new_request(queue.clone(), channel.clone(), msg?)
- .await;
- }
- Either::Right(res) => {
- conn.send(res).await?;
- }
- }
+ let msg = conn.recv().await?;
+ selfc.new_request(queue.clone(), channel.clone(), msg).await;
}
},
on_complete,