From 3429caa87699d986f799a11f6e0f4526e723b655 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 14 Jun 2024 22:49:53 +0200 Subject: jsonrpc: client use unbounded channels as buffer for sending requests & clean up examples --- Cargo.lock | 4 +- jsonrpc/Cargo.toml | 2 +- jsonrpc/README.md | 2 +- jsonrpc/examples/client.rs | 45 ++++++++++++---------- jsonrpc/examples/pubsub_client.rs | 4 +- jsonrpc/examples/pubsub_server.rs | 7 ++-- jsonrpc/examples/server.rs | 2 +- jsonrpc/examples/tokio_server/src/main.rs | 2 +- jsonrpc/src/client/message_dispatcher.rs | 4 +- jsonrpc/src/client/mod.rs | 2 +- jsonrpc/src/client/subscriber.rs | 4 +- jsonrpc/src/server/channel.rs | 3 +- jsonrpc/src/server/mod.rs | 62 +++++++++++++++---------------- p2p/examples/monitor/src/main.rs | 2 +- 14 files changed, 76 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4944be2..ef2cb1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1559,9 +1559,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index dea980b..a174040 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -7,7 +7,7 @@ homepage.workspace = true repository.workspace = true license.workspace = true authors.workspace = true -readme="README.md" +readme = "README.md" [features] diff --git a/jsonrpc/README.md b/jsonrpc/README.md index b4ee580..8127727 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -94,7 +94,7 @@ async { .expect("build the server"); // Starts the server - server.start().await; + server.start(); smol::Timer::after(Duration::MAX).await; }; diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs index 662aacb..0f87ecb 100644 --- a/jsonrpc/examples/client.rs +++ b/jsonrpc/examples/client.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use log::info; use serde::{Deserialize, Serialize}; use smol::Timer; @@ -21,29 +22,35 @@ fn main() { .expect("Create client builder") .build() .await - .unwrap(); - - let clientc = client.clone(); - smol::spawn(async move { - loop { - Timer::after(Duration::from_millis(500)).await; - let result: Pong = clientc.call("Calc.ping", ()).await.unwrap(); - println!("ping msg result: {:?}", result); - } - }) - .detach(); + .expect("Create rpc client"); let params = Req { x: 10, y: 7 }; - let result: u32 = client.call("Calc.add", params).await.unwrap(); - println!("result {result}"); + let result: u32 = client + .call("Calc.add", params) + .await + .expect("Call Calc.add method"); + info!("Add result: {result}"); let params = Req { x: 10, y: 7 }; - let result: u32 = client.call("Calc.sub", params).await.unwrap(); - println!("result {result}"); - - let result: String = client.call("Calc.version", ()).await.unwrap(); - println!("result {result}"); + let result: u32 = client + .call("Calc.sub", params) + .await + .expect("Call Calc.sub method"); + info!("Sub result: {result}"); - Timer::after(Duration::from_secs(10)).await; + let result: String = client + .call("Calc.version", ()) + .await + .expect("Call Calc.version method"); + info!("Version result: {result}"); + + loop { + Timer::after(Duration::from_millis(100)).await; + let result: Pong = client + .call("Calc.ping", ()) + .await + .expect("Call Calc.ping method"); + info!("Ping result: {:?}", result); + } }); } diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index 830b32f..73570b8 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use log::info; use serde::{Deserialize, Serialize}; use smol::Timer; @@ -25,7 +26,8 @@ async fn run_client() { smol::spawn(async move { loop { - let _m = sub.recv().await.unwrap(); + let m = sub.recv().await.expect("Receive new log msg"); + info!("Receive new log {m}"); } }) .detach(); diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs index 74eb907..40ea756 100644 --- a/jsonrpc/examples/pubsub_server.rs +++ b/jsonrpc/examples/pubsub_server.rs @@ -1,5 +1,6 @@ use std::{sync::Arc, time::Duration}; +use log::error; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -36,9 +37,9 @@ impl Calc { let sub_id = sub.id.clone(); smol::spawn(async move { loop { - smol::Timer::after(std::time::Duration::from_secs(1)).await; + sleep(Duration::from_millis(500)).await; if let Err(err) = sub.notify(serde_json::json!("Hello")).await { - println!("Error send notification {err}"); + error!("Error send notification {err}"); break; } } @@ -75,7 +76,7 @@ fn main() { .expect("Build a new server"); // Start the server - server.start().await; + server.start(); sleep(Duration::MAX).await; }); diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 470bd02..31e65dd 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -57,7 +57,7 @@ fn main() { .expect("start a new server"); // Start the server - server.start().await; + server.start(); sleep(Duration::MAX).await; }); diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 0a47fda..a9b2b32 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -91,7 +91,7 @@ async fn main() { .expect("start a new server"); // Start the server - server.start().await; + server.start(); tokio::time::sleep(Duration::MAX).await; } diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs index a803f6e..aa47cec 100644 --- a/jsonrpc/src/client/message_dispatcher.rs +++ b/jsonrpc/src/client/message_dispatcher.rs @@ -8,8 +8,6 @@ use crate::{message, Error, Result}; use super::RequestID; -const CHANNEL_CAP: usize = 10; - /// Manages client requests pub(super) struct MessageDispatcher { chans: Mutex>>, @@ -26,7 +24,7 @@ impl MessageDispatcher { /// Registers a new request with a given ID and returns a Receiver channel /// to wait for the response. pub(super) async fn register(&self, id: RequestID) -> Receiver { - let (tx, rx) = async_channel::bounded(CHANNEL_CAP); + let (tx, rx) = async_channel::unbounded(); self.chans.lock().await.insert(id, tx); rx } diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 1225f13..9c07509 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -147,7 +147,7 @@ impl Client { let msg = selfc.conn.recv().await?; if let Err(err) = selfc.handle_msg(msg).await { error!( - "Failed to handle a new received msg from the connection {} : {err}", + "Handle a msg from the endpoint {} : {err}", selfc.conn.peer_endpoint()? ); } diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs index d47cc2a..168f16e 100644 --- a/jsonrpc/src/client/subscriber.rs +++ b/jsonrpc/src/client/subscriber.rs @@ -11,8 +11,6 @@ use crate::{ Error, Result, }; -const CHANNEL_CAP: usize = 10; - /// Manages subscriptions for the client. pub(super) struct Subscriber { subs: Mutex>>, @@ -31,7 +29,7 @@ impl Subscriber { } pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver { - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let (ch_tx, ch_rx) = async_channel::unbounded(); self.subs.lock().await.insert(id, ch_tx); ch_rx } diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 9278c8c..b5c9184 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -4,6 +4,7 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32}; use crate::{message::SubscriptionID, Error, Result}; +#[derive(Debug)] pub(crate) struct NewNotification { pub sub_id: SubscriptionID, pub result: serde_json::Value, @@ -20,7 +21,7 @@ pub struct Subscription { } impl Subscription { - /// Creates a new `Subscription` + /// Creates a new [`Subscription`] fn new( parent: Arc, id: SubscriptionID, diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 09850c5..86b1b31 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -16,8 +16,6 @@ use crate::{message, Error, PubSubRPCService, RPCService, Result}; use channel::Channel; use response_queue::ResponseQueue; -const CHANNEL_CAP: usize = 10; - pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; @@ -49,7 +47,7 @@ impl Server { } /// Starts the RPC server - pub async fn start(self: &Arc) { + pub fn start(self: &Arc) { let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Accept loop stopped: {err}"); @@ -87,7 +85,9 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let conn = Arc::new(conn); + + let (ch_tx, ch_rx) = async_channel::unbounded(); // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); @@ -99,28 +99,37 @@ impl Server { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } - // close the subscription channel + // Close the connection subscription channel chan.close(); }; + let conn_cloned = conn.clone(); let queue_cloned = queue.clone(); - // Start listing to new notifications coming from rpc services - // Push notifications as responses to the response queue + // Start listening for responses in the queue or new notifications self.task_group.spawn( async move { loop { - let nt = ch_rx.recv().await?; - let params = Some(serde_json::json!(message::NotificationResult { - subscription: nt.sub_id, - result: Some(nt.result), - })); - let notification = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: nt.method, - params, - }; - debug!("--> {notification}"); - queue_cloned.push(serde_json::json!(notification)).await; + // The select function will prioritize the first future if both futures are ready. + // This gives priority to the responses in the response queue. + match select(queue_cloned.recv(), ch_rx.recv()).await { + Either::Left(res) => { + conn_cloned.send(res).await?; + } + Either::Right(notification) => { + let nt = notification?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let notification = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + // debug!("--> {notification}"); + conn_cloned.send(serde_json::json!(notification)).await?; + } + } } }, on_complete, @@ -138,21 +147,12 @@ impl Server { }; let selfc = self.clone(); - // Spawn a new task and wait for either a new response in the response - // queue or a new request coming from a connected client. + // Spawn a new task and wait for requests. self.task_group.spawn( async move { loop { - match select(conn.recv(), queue.recv()).await { - Either::Left(msg) => { - selfc - .new_request(queue.clone(), channel.clone(), msg?) - .await; - } - Either::Right(res) => { - conn.send(res).await?; - } - } + let msg = conn.recv().await?; + selfc.new_request(queue.clone(), channel.clone(), msg).await; } }, on_complete, diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs index 990f8d2..78ada48 100644 --- a/p2p/examples/monitor/src/main.rs +++ b/p2p/examples/monitor/src/main.rs @@ -83,7 +83,7 @@ fn main() { .expect("Build rpc server"); // Run the RPC server - server.start().await; + server.start(); // Run the RPC Service service.run().await.expect("Run monitor rpc service"); -- cgit v1.2.3