aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-14 22:49:53 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-14 22:49:53 +0200
commit3429caa87699d986f799a11f6e0f4526e723b655 (patch)
treee548c356fca4eb76742e29d4bab05007468ed14b /jsonrpc/src/server/mod.rs
parent0c0699c0460c1b149915729223eec701bde481df (diff)
jsonrpc: client use unbounded channels as buffer for sending requests & clean up examples
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
-rw-r--r--jsonrpc/src/server/mod.rs62
1 files changed, 31 insertions, 31 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 09850c5..86b1b31 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -16,8 +16,6 @@ use crate::{message, Error, PubSubRPCService, RPCService, Result};
use channel::Channel;
use response_queue::ResponseQueue;
-const CHANNEL_CAP: usize = 10;
-
pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request";
pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
@@ -49,7 +47,7 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: &Arc<Self>) {
+ pub fn start(self: &Arc<Self>) {
let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Accept loop stopped: {err}");
@@ -87,7 +85,9 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let conn = Arc::new(conn);
+
+ let (ch_tx, ch_rx) = async_channel::unbounded();
// Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
@@ -99,28 +99,37 @@ impl Server {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
- // close the subscription channel
+ // Close the connection subscription channel
chan.close();
};
+ let conn_cloned = conn.clone();
let queue_cloned = queue.clone();
- // Start listing to new notifications coming from rpc services
- // Push notifications as responses to the response queue
+ // Start listening for responses in the queue or new notifications
self.task_group.spawn(
async move {
loop {
- let nt = ch_rx.recv().await?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- queue_cloned.push(serde_json::json!(notification)).await;
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue_cloned.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn_cloned.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ // debug!("--> {notification}");
+ conn_cloned.send(serde_json::json!(notification)).await?;
+ }
+ }
}
},
on_complete,
@@ -138,21 +147,12 @@ impl Server {
};
let selfc = self.clone();
- // Spawn a new task and wait for either a new response in the response
- // queue or a new request coming from a connected client.
+ // Spawn a new task and wait for requests.
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), queue.recv()).await {
- Either::Left(msg) => {
- selfc
- .new_request(queue.clone(), channel.clone(), msg?)
- .await;
- }
- Either::Right(res) => {
- conn.send(res).await?;
- }
- }
+ let msg = conn.recv().await?;
+ selfc.new_request(queue.clone(), channel.clone(), msg).await;
}
},
on_complete,