aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock4
-rw-r--r--jsonrpc/Cargo.toml2
-rw-r--r--jsonrpc/README.md2
-rw-r--r--jsonrpc/examples/client.rs45
-rw-r--r--jsonrpc/examples/pubsub_client.rs4
-rw-r--r--jsonrpc/examples/pubsub_server.rs7
-rw-r--r--jsonrpc/examples/server.rs2
-rw-r--r--jsonrpc/examples/tokio_server/src/main.rs2
-rw-r--r--jsonrpc/src/client/message_dispatcher.rs4
-rw-r--r--jsonrpc/src/client/mod.rs2
-rw-r--r--jsonrpc/src/client/subscriber.rs4
-rw-r--r--jsonrpc/src/server/channel.rs3
-rw-r--r--jsonrpc/src/server/mod.rs62
-rw-r--r--p2p/examples/monitor/src/main.rs2
14 files changed, 76 insertions, 69 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4944be2..ef2cb1c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1559,9 +1559,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
-version = "0.12.2"
+version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
+checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml
index dea980b..a174040 100644
--- a/jsonrpc/Cargo.toml
+++ b/jsonrpc/Cargo.toml
@@ -7,7 +7,7 @@ homepage.workspace = true
repository.workspace = true
license.workspace = true
authors.workspace = true
-readme="README.md"
+readme = "README.md"
[features]
diff --git a/jsonrpc/README.md b/jsonrpc/README.md
index b4ee580..8127727 100644
--- a/jsonrpc/README.md
+++ b/jsonrpc/README.md
@@ -94,7 +94,7 @@ async {
.expect("build the server");
// Starts the server
- server.start().await;
+ server.start();
smol::Timer::after(Duration::MAX).await;
};
diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs
index 662aacb..0f87ecb 100644
--- a/jsonrpc/examples/client.rs
+++ b/jsonrpc/examples/client.rs
@@ -1,5 +1,6 @@
use std::time::Duration;
+use log::info;
use serde::{Deserialize, Serialize};
use smol::Timer;
@@ -21,29 +22,35 @@ fn main() {
.expect("Create client builder")
.build()
.await
- .unwrap();
-
- let clientc = client.clone();
- smol::spawn(async move {
- loop {
- Timer::after(Duration::from_millis(500)).await;
- let result: Pong = clientc.call("Calc.ping", ()).await.unwrap();
- println!("ping msg result: {:?}", result);
- }
- })
- .detach();
+ .expect("Create rpc client");
let params = Req { x: 10, y: 7 };
- let result: u32 = client.call("Calc.add", params).await.unwrap();
- println!("result {result}");
+ let result: u32 = client
+ .call("Calc.add", params)
+ .await
+ .expect("Call Calc.add method");
+ info!("Add result: {result}");
let params = Req { x: 10, y: 7 };
- let result: u32 = client.call("Calc.sub", params).await.unwrap();
- println!("result {result}");
-
- let result: String = client.call("Calc.version", ()).await.unwrap();
- println!("result {result}");
+ let result: u32 = client
+ .call("Calc.sub", params)
+ .await
+ .expect("Call Calc.sub method");
+ info!("Sub result: {result}");
- Timer::after(Duration::from_secs(10)).await;
+ let result: String = client
+ .call("Calc.version", ())
+ .await
+ .expect("Call Calc.version method");
+ info!("Version result: {result}");
+
+ loop {
+ Timer::after(Duration::from_millis(100)).await;
+ let result: Pong = client
+ .call("Calc.ping", ())
+ .await
+ .expect("Call Calc.ping method");
+ info!("Ping result: {:?}", result);
+ }
});
}
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
index 830b32f..73570b8 100644
--- a/jsonrpc/examples/pubsub_client.rs
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -1,5 +1,6 @@
use std::time::Duration;
+use log::info;
use serde::{Deserialize, Serialize};
use smol::Timer;
@@ -25,7 +26,8 @@ async fn run_client() {
smol::spawn(async move {
loop {
- let _m = sub.recv().await.unwrap();
+ let m = sub.recv().await.expect("Receive new log msg");
+ info!("Receive new log {m}");
}
})
.detach();
diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs
index 74eb907..40ea756 100644
--- a/jsonrpc/examples/pubsub_server.rs
+++ b/jsonrpc/examples/pubsub_server.rs
@@ -1,5 +1,6 @@
use std::{sync::Arc, time::Duration};
+use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -36,9 +37,9 @@ impl Calc {
let sub_id = sub.id.clone();
smol::spawn(async move {
loop {
- smol::Timer::after(std::time::Duration::from_secs(1)).await;
+ sleep(Duration::from_millis(500)).await;
if let Err(err) = sub.notify(serde_json::json!("Hello")).await {
- println!("Error send notification {err}");
+ error!("Error send notification {err}");
break;
}
}
@@ -75,7 +76,7 @@ fn main() {
.expect("Build a new server");
// Start the server
- server.start().await;
+ server.start();
sleep(Duration::MAX).await;
});
diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs
index 470bd02..31e65dd 100644
--- a/jsonrpc/examples/server.rs
+++ b/jsonrpc/examples/server.rs
@@ -57,7 +57,7 @@ fn main() {
.expect("start a new server");
// Start the server
- server.start().await;
+ server.start();
sleep(Duration::MAX).await;
});
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index 0a47fda..a9b2b32 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -91,7 +91,7 @@ async fn main() {
.expect("start a new server");
// Start the server
- server.start().await;
+ server.start();
tokio::time::sleep(Duration::MAX).await;
}
diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs
index a803f6e..aa47cec 100644
--- a/jsonrpc/src/client/message_dispatcher.rs
+++ b/jsonrpc/src/client/message_dispatcher.rs
@@ -8,8 +8,6 @@ use crate::{message, Error, Result};
use super::RequestID;
-const CHANNEL_CAP: usize = 10;
-
/// Manages client requests
pub(super) struct MessageDispatcher {
chans: Mutex<HashMap<RequestID, Sender<message::Response>>>,
@@ -26,7 +24,7 @@ impl MessageDispatcher {
/// Registers a new request with a given ID and returns a Receiver channel
/// to wait for the response.
pub(super) async fn register(&self, id: RequestID) -> Receiver<message::Response> {
- let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
+ let (tx, rx) = async_channel::unbounded();
self.chans.lock().await.insert(id, tx);
rx
}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 1225f13..9c07509 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -147,7 +147,7 @@ impl Client {
let msg = selfc.conn.recv().await?;
if let Err(err) = selfc.handle_msg(msg).await {
error!(
- "Failed to handle a new received msg from the connection {} : {err}",
+ "Handle a msg from the endpoint {} : {err}",
selfc.conn.peer_endpoint()?
);
}
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
index d47cc2a..168f16e 100644
--- a/jsonrpc/src/client/subscriber.rs
+++ b/jsonrpc/src/client/subscriber.rs
@@ -11,8 +11,6 @@ use crate::{
Error, Result,
};
-const CHANNEL_CAP: usize = 10;
-
/// Manages subscriptions for the client.
pub(super) struct Subscriber {
subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
@@ -31,7 +29,7 @@ impl Subscriber {
}
pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let (ch_tx, ch_rx) = async_channel::unbounded();
self.subs.lock().await.insert(id, ch_tx);
ch_rx
}
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 9278c8c..b5c9184 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -4,6 +4,7 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32};
use crate::{message::SubscriptionID, Error, Result};
+#[derive(Debug)]
pub(crate) struct NewNotification {
pub sub_id: SubscriptionID,
pub result: serde_json::Value,
@@ -20,7 +21,7 @@ pub struct Subscription {
}
impl Subscription {
- /// Creates a new `Subscription`
+ /// Creates a new [`Subscription`]
fn new(
parent: Arc<Channel>,
id: SubscriptionID,
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 09850c5..86b1b31 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -16,8 +16,6 @@ use crate::{message, Error, PubSubRPCService, RPCService, Result};
use channel::Channel;
use response_queue::ResponseQueue;
-const CHANNEL_CAP: usize = 10;
-
pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request";
pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
@@ -49,7 +47,7 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: &Arc<Self>) {
+ pub fn start(self: &Arc<Self>) {
let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Accept loop stopped: {err}");
@@ -87,7 +85,9 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let conn = Arc::new(conn);
+
+ let (ch_tx, ch_rx) = async_channel::unbounded();
// Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
@@ -99,28 +99,37 @@ impl Server {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
- // close the subscription channel
+ // Close the connection subscription channel
chan.close();
};
+ let conn_cloned = conn.clone();
let queue_cloned = queue.clone();
- // Start listing to new notifications coming from rpc services
- // Push notifications as responses to the response queue
+ // Start listening for responses in the queue or new notifications
self.task_group.spawn(
async move {
loop {
- let nt = ch_rx.recv().await?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- queue_cloned.push(serde_json::json!(notification)).await;
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue_cloned.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn_cloned.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ // debug!("--> {notification}");
+ conn_cloned.send(serde_json::json!(notification)).await?;
+ }
+ }
}
},
on_complete,
@@ -138,21 +147,12 @@ impl Server {
};
let selfc = self.clone();
- // Spawn a new task and wait for either a new response in the response
- // queue or a new request coming from a connected client.
+ // Spawn a new task and wait for requests.
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), queue.recv()).await {
- Either::Left(msg) => {
- selfc
- .new_request(queue.clone(), channel.clone(), msg?)
- .await;
- }
- Either::Right(res) => {
- conn.send(res).await?;
- }
- }
+ let msg = conn.recv().await?;
+ selfc.new_request(queue.clone(), channel.clone(), msg).await;
}
},
on_complete,
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
index 990f8d2..78ada48 100644
--- a/p2p/examples/monitor/src/main.rs
+++ b/p2p/examples/monitor/src/main.rs
@@ -83,7 +83,7 @@ fn main() {
.expect("Build rpc server");
// Run the RPC server
- server.start().await;
+ server.start();
// Run the RPC Service
service.run().await.expect("Run monitor rpc service");