aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-30 02:07:50 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-30 02:07:50 +0200
commita06239ccc5e21fd20182ec3046cf9174ecc58a43 (patch)
tree12bf68067ba377f8bb081e98c3e3a4ac4fcd13b7
parent34f528a3b4063b4a25915e60d7f22ee1fb2d1bd9 (diff)
jsonrpc/server: use queue with condvar instead of async channels
-rw-r--r--jsonrpc/examples/pubsub_client.rs68
-rw-r--r--jsonrpc/examples/tokio_server/Cargo.toml4
-rw-r--r--jsonrpc/examples/tokio_server/src/main.rs45
-rw-r--r--jsonrpc/src/client/mod.rs7
-rw-r--r--jsonrpc/src/server/channel.rs10
-rw-r--r--jsonrpc/src/server/mod.rs75
6 files changed, 154 insertions, 55 deletions
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
index fee2a26..830b32f 100644
--- a/jsonrpc/examples/pubsub_client.rs
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -1,47 +1,47 @@
+use std::time::Duration;
+
use serde::{Deserialize, Serialize};
-use smol::stream::StreamExt;
+use smol::Timer;
use karyon_jsonrpc::Client;
#[derive(Deserialize, Serialize, Debug)]
struct Pong {}
-fn main() {
- env_logger::init();
- smol::future::block_on(async {
- let client = Client::builder("tcp://127.0.0.1:6000")
- .expect("Create client builder")
- .build()
- .await
- .expect("Build a client");
-
- let result: Pong = client
+async fn run_client() {
+ let client = Client::builder("tcp://127.0.0.1:6000")
+ .expect("Create client builder")
+ .build()
+ .await
+ .expect("Build a client");
+
+ let clientc = client.clone();
+ smol::spawn(async move {}).detach();
+
+ let (_, sub) = client
+ .subscribe("Calc.log_subscribe", ())
+ .await
+ .expect("Subscribe to log_subscribe method");
+
+ smol::spawn(async move {
+ loop {
+ let _m = sub.recv().await.unwrap();
+ }
+ })
+ .detach();
+
+ loop {
+ Timer::after(Duration::from_secs(1)).await;
+ let _: Pong = clientc
.call("Calc.ping", ())
.await
.expect("Send ping request");
+ }
+}
- println!("receive pong msg: {:?}", result);
-
- let (sub_id, sub) = client
- .subscribe("Calc.log_subscribe", ())
- .await
- .expect("Subscribe to log_subscribe method");
-
- smol::spawn(async move {
- sub.for_each(|m| {
- println!("Receive new notification: {m}");
- })
- .await
- })
- .detach();
-
- smol::Timer::after(std::time::Duration::from_secs(5)).await;
-
- client
- .unsubscribe("Calc.log_unsubscribe", sub_id)
- .await
- .expect("Unsubscribe from log_unsubscirbe method");
-
- smol::Timer::after(std::time::Duration::from_secs(2)).await;
+fn main() {
+ env_logger::init();
+ smol::future::block_on(async {
+ smol::spawn(run_client()).await;
});
}
diff --git a/jsonrpc/examples/tokio_server/Cargo.toml b/jsonrpc/examples/tokio_server/Cargo.toml
index 93d8a61..9ed681b 100644
--- a/jsonrpc/examples/tokio_server/Cargo.toml
+++ b/jsonrpc/examples/tokio_server/Cargo.toml
@@ -12,3 +12,7 @@ serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.37.0", features = ["full"] }
+[profile.release]
+debug = true
+
+
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index 41d4c74..3bb4871 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use karyon_jsonrpc::{rpc_impl, Error, Server};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server,
+};
struct Calc {
version: String,
@@ -39,18 +41,53 @@ impl Calc {
}
}
+#[rpc_pubsub_impl]
+impl Calc {
+ async fn log_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: Value,
+ ) -> Result<Value, Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+ if let Err(_) = sub.notify(serde_json::json!("Hello")).await {
+ break;
+ }
+ }
+ });
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn log_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: Value,
+ ) -> Result<Value, Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
#[tokio::main]
async fn main() {
env_logger::init();
// Register the Calc service
- let calc = Calc {
+ let calc = Arc::new(Calc {
version: String::from("0.1"),
- };
+ });
// Creates a new server
let server = Server::builder("ws://127.0.0.1:6000")
.expect("Create a new server builder")
- .service(Arc::new(calc))
+ .service(calc.clone())
+ .pubsub_service(calc)
.build()
.await
.expect("start a new server");
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 326252a..3a4505c 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -149,7 +149,12 @@ impl Client {
async move {
loop {
let msg = selfc.conn.recv().await?;
- selfc.handle_msg(msg).await?;
+ if let Err(err) = selfc.handle_msg(msg).await {
+ error!(
+ "Failed to handle a new received msg from the connection {} : {err}",
+ selfc.conn.peer_endpoint()?
+ );
+ }
}
},
on_failure,
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index efcd344..a9d1002 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -59,7 +59,7 @@ pub struct Channel {
}
impl Channel {
- /// Creates a new `Channel`
+ /// Creates a new [`Channel`]
pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
Arc::new(Self {
chan,
@@ -67,7 +67,7 @@ impl Channel {
})
}
- /// Creates a new subscription
+ /// Creates a new [`Subscription`]
pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {
let sub_id = random_32();
let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method);
@@ -76,7 +76,7 @@ impl Channel {
}
/// Removes a subscription
- pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) {
+ pub async fn remove_subscription(&self, id: &SubscriptionID) {
let mut subs = self.subs.lock().await;
let i = match subs.iter().position(|i| i == id) {
Some(i) => i,
@@ -84,4 +84,8 @@ impl Channel {
};
subs.remove(i);
}
+
+ pub fn close(&self) {
+ self.chan.close();
+ }
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7f28de2..7136be4 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -3,11 +3,17 @@ pub mod channel;
pub mod pubsub_service;
pub mod service;
-use std::{collections::HashMap, sync::Arc};
+use std::{
+ collections::{HashMap, VecDeque},
+ sync::Arc,
+};
use log::{debug, error, trace, warn};
-use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
+use karyon_core::{
+ async_runtime::lock::Mutex,
+ async_util::{select, CondVar, Either, TaskGroup, TaskResult},
+};
use karyon_net::{Conn, Endpoint, Listener};
use crate::{message, Error, PubSubRPCService, RPCService, Result};
@@ -47,6 +53,42 @@ enum SanityCheckResult {
ErrRes(message::Response),
}
+struct ResponseQueue<T> {
+ queue: Mutex<VecDeque<T>>,
+ condvar: CondVar,
+}
+
+impl<T> ResponseQueue<T> {
+ fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(VecDeque::new()),
+ condvar: CondVar::new(),
+ })
+ }
+
+ /// Wait while the queue is empty, remove and return the item from the queue,
+ /// panicking if empty (shouldn't happen)
+ async fn recv(&self) -> T {
+ let mut queue = self.queue.lock().await;
+
+ while queue.is_empty() {
+ queue = self.condvar.wait(queue).await;
+ }
+
+ match queue.pop_front() {
+ Some(v) => v,
+ None => unreachable!(),
+ }
+ }
+
+ /// Push an item into the queue, notify all waiting tasks that the
+ /// condvar has changed
+ async fn push(&self, res: T) {
+ self.queue.lock().await.push_back(res);
+ self.condvar.broadcast();
+ }
+}
+
/// Represents an RPC server
pub struct Server {
listener: Listener<serde_json::Value>,
@@ -99,20 +141,21 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
- // TODO Avoid depending on channels
- let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP);
-
let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
let channel = Channel::new(ch_tx);
+ let queue = ResponseQueue::new();
+
+ let chan = channel.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
+ // close the subscription channel
+ chan.close();
};
- let selfc = self.clone();
- let txc = tx.clone();
+ let queue_cloned = queue.clone();
self.task_group.spawn(
async move {
loop {
@@ -127,28 +170,34 @@ impl Server {
params,
};
debug!("--> {response}");
- txc.send(serde_json::to_value(response)?).await?;
+ queue_cloned.push(serde_json::json!(response)).await;
}
},
on_failure,
);
+ let chan = channel.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
} else {
warn!("Connection {} dropped", endpoint);
}
+ // close the subscription channel when the connection dropped
+ chan.close();
};
+ let selfc = self.clone();
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), rx.recv()).await {
+ match select(conn.recv(), queue.recv()).await {
Either::Left(msg) => {
- selfc.new_request(tx.clone(), channel.clone(), msg?).await;
+ selfc
+ .new_request(queue.clone(), channel.clone(), msg?)
+ .await;
}
- Either::Right(msg) => conn.send(msg?).await?,
+ Either::Right(res) => conn.send(res).await?,
}
}
},
@@ -194,7 +243,7 @@ impl Server {
/// Spawns a new task for handling a new request
async fn new_request(
self: &Arc<Self>,
- sender: async_channel::Sender<serde_json::Value>,
+ queue: Arc<ResponseQueue<serde_json::Value>>,
channel: ArcChannel,
msg: serde_json::Value,
) {
@@ -209,7 +258,7 @@ impl Server {
async move {
let response = selfc.handle_request(channel, msg).await;
debug!("--> {response}");
- sender.send(serde_json::json!(response)).await?;
+ queue.push(serde_json::json!(response)).await;
Ok(())
},
on_failure,