From a06239ccc5e21fd20182ec3046cf9174ecc58a43 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 30 May 2024 02:07:50 +0200 Subject: jsonrpc/server: use queue with condvar instead of async channels --- jsonrpc/examples/pubsub_client.rs | 68 ++++++++++++++-------------- jsonrpc/examples/tokio_server/Cargo.toml | 4 ++ jsonrpc/examples/tokio_server/src/main.rs | 45 +++++++++++++++++-- jsonrpc/src/client/mod.rs | 7 ++- jsonrpc/src/server/channel.rs | 10 +++-- jsonrpc/src/server/mod.rs | 75 +++++++++++++++++++++++++------ 6 files changed, 154 insertions(+), 55 deletions(-) (limited to 'jsonrpc') diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index fee2a26..830b32f 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -1,47 +1,47 @@ +use std::time::Duration; + use serde::{Deserialize, Serialize}; -use smol::stream::StreamExt; +use smol::Timer; use karyon_jsonrpc::Client; #[derive(Deserialize, Serialize, Debug)] struct Pong {} -fn main() { - env_logger::init(); - smol::future::block_on(async { - let client = Client::builder("tcp://127.0.0.1:6000") - .expect("Create client builder") - .build() - .await - .expect("Build a client"); - - let result: Pong = client +async fn run_client() { + let client = Client::builder("tcp://127.0.0.1:6000") + .expect("Create client builder") + .build() + .await + .expect("Build a client"); + + let clientc = client.clone(); + smol::spawn(async move {}).detach(); + + let (_, sub) = client + .subscribe("Calc.log_subscribe", ()) + .await + .expect("Subscribe to log_subscribe method"); + + smol::spawn(async move { + loop { + let _m = sub.recv().await.unwrap(); + } + }) + .detach(); + + loop { + Timer::after(Duration::from_secs(1)).await; + let _: Pong = clientc .call("Calc.ping", ()) .await .expect("Send ping request"); + } +} - println!("receive pong msg: {:?}", result); - - let (sub_id, sub) = client - .subscribe("Calc.log_subscribe", ()) - .await - .expect("Subscribe to log_subscribe method"); - - smol::spawn(async move { - sub.for_each(|m| { - println!("Receive new notification: {m}"); - }) - .await - }) - .detach(); - - smol::Timer::after(std::time::Duration::from_secs(5)).await; - - client - .unsubscribe("Calc.log_unsubscribe", sub_id) - .await - .expect("Unsubscribe from log_unsubscirbe method"); - - smol::Timer::after(std::time::Duration::from_secs(2)).await; +fn main() { + env_logger::init(); + smol::future::block_on(async { + smol::spawn(run_client()).await; }); } diff --git a/jsonrpc/examples/tokio_server/Cargo.toml b/jsonrpc/examples/tokio_server/Cargo.toml index 93d8a61..9ed681b 100644 --- a/jsonrpc/examples/tokio_server/Cargo.toml +++ b/jsonrpc/examples/tokio_server/Cargo.toml @@ -12,3 +12,7 @@ serde = { version = "1.0.202", features = ["derive"] } serde_json = "1.0.117" tokio = { version = "1.37.0", features = ["full"] } +[profile.release] +debug = true + + diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 41d4c74..3bb4871 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use karyon_jsonrpc::{rpc_impl, Error, Server}; +use karyon_jsonrpc::{ + message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, +}; struct Calc { version: String, @@ -39,18 +41,53 @@ impl Calc { } } +#[rpc_pubsub_impl] +impl Calc { + async fn log_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: Value, + ) -> Result { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if let Err(_) = sub.notify(serde_json::json!("Hello")).await { + break; + } + } + }); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: Value, + ) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + #[tokio::main] async fn main() { env_logger::init(); // Register the Calc service - let calc = Calc { + let calc = Arc::new(Calc { version: String::from("0.1"), - }; + }); // Creates a new server let server = Server::builder("ws://127.0.0.1:6000") .expect("Create a new server builder") - .service(Arc::new(calc)) + .service(calc.clone()) + .pubsub_service(calc) .build() .await .expect("start a new server"); diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 326252a..3a4505c 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -149,7 +149,12 @@ impl Client { async move { loop { let msg = selfc.conn.recv().await?; - selfc.handle_msg(msg).await?; + if let Err(err) = selfc.handle_msg(msg).await { + error!( + "Failed to handle a new received msg from the connection {} : {err}", + selfc.conn.peer_endpoint()? + ); + } } }, on_failure, diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index efcd344..a9d1002 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -59,7 +59,7 @@ pub struct Channel { } impl Channel { - /// Creates a new `Channel` + /// Creates a new [`Channel`] pub(crate) fn new(chan: async_channel::Sender) -> ArcChannel { Arc::new(Self { chan, @@ -67,7 +67,7 @@ impl Channel { }) } - /// Creates a new subscription + /// Creates a new [`Subscription`] pub async fn new_subscription(self: &Arc, method: &str) -> Subscription { let sub_id = random_32(); let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method); @@ -76,7 +76,7 @@ impl Channel { } /// Removes a subscription - pub async fn remove_subscription(self: &Arc, id: &SubscriptionID) { + pub async fn remove_subscription(&self, id: &SubscriptionID) { let mut subs = self.subs.lock().await; let i = match subs.iter().position(|i| i == id) { Some(i) => i, @@ -84,4 +84,8 @@ impl Channel { }; subs.remove(i); } + + pub fn close(&self) { + self.chan.close(); + } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7f28de2..7136be4 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -3,11 +3,17 @@ pub mod channel; pub mod pubsub_service; pub mod service; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ + async_runtime::lock::Mutex, + async_util::{select, CondVar, Either, TaskGroup, TaskResult}, +}; use karyon_net::{Conn, Endpoint, Listener}; use crate::{message, Error, PubSubRPCService, RPCService, Result}; @@ -47,6 +53,42 @@ enum SanityCheckResult { ErrRes(message::Response), } +struct ResponseQueue { + queue: Mutex>, + condvar: CondVar, +} + +impl ResponseQueue { + fn new() -> Arc { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.broadcast(); + } +} + /// Represents an RPC server pub struct Server { listener: Listener, @@ -99,20 +141,21 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); - // TODO Avoid depending on channels - let (tx, rx) = async_channel::bounded::(CHANNEL_CAP); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); let channel = Channel::new(ch_tx); + let queue = ResponseQueue::new(); + + let chan = channel.clone(); let on_failure = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } + // close the subscription channel + chan.close(); }; - let selfc = self.clone(); - let txc = tx.clone(); + let queue_cloned = queue.clone(); self.task_group.spawn( async move { loop { @@ -127,28 +170,34 @@ impl Server { params, }; debug!("--> {response}"); - txc.send(serde_json::to_value(response)?).await?; + queue_cloned.push(serde_json::json!(response)).await; } }, on_failure, ); + let chan = channel.clone(); let on_failure = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); } else { warn!("Connection {} dropped", endpoint); } + // close the subscription channel when the connection dropped + chan.close(); }; + let selfc = self.clone(); self.task_group.spawn( async move { loop { - match select(conn.recv(), rx.recv()).await { + match select(conn.recv(), queue.recv()).await { Either::Left(msg) => { - selfc.new_request(tx.clone(), channel.clone(), msg?).await; + selfc + .new_request(queue.clone(), channel.clone(), msg?) + .await; } - Either::Right(msg) => conn.send(msg?).await?, + Either::Right(res) => conn.send(res).await?, } } }, @@ -194,7 +243,7 @@ impl Server { /// Spawns a new task for handling a new request async fn new_request( self: &Arc, - sender: async_channel::Sender, + queue: Arc>, channel: ArcChannel, msg: serde_json::Value, ) { @@ -209,7 +258,7 @@ impl Server { async move { let response = selfc.handle_request(channel, msg).await; debug!("--> {response}"); - sender.send(serde_json::json!(response)).await?; + queue.push(serde_json::json!(response)).await; Ok(()) }, on_failure, -- cgit v1.2.3