aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-13 05:52:48 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-13 05:52:48 +0200
commit1c27f751c30196e2c421ae420dacbc4ff25f0fc7 (patch)
treee9a34bea9e6fd45d53a4ad1a7a4e75857ad2fe9a /jsonrpc/src
parentd6a280f69a6685d5b4da5366626fb76a27f0cc07 (diff)
jsonrpc: spread out comments and clean up
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/client/builder.rs10
-rw-r--r--jsonrpc/src/client/message_dispatcher.rs54
-rw-r--r--jsonrpc/src/client/mod.rs107
-rw-r--r--jsonrpc/src/client/subscriber.rs64
-rw-r--r--jsonrpc/src/lib.rs2
-rw-r--r--jsonrpc/src/server/channel.rs8
-rw-r--r--jsonrpc/src/server/mod.rs131
-rw-r--r--jsonrpc/src/server/pubsub_service.rs9
-rw-r--r--jsonrpc/src/server/response_queue.rs40
9 files changed, 265 insertions, 160 deletions
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs
index a287070..2263498 100644
--- a/jsonrpc/src/client/builder.rs
+++ b/jsonrpc/src/client/builder.rs
@@ -1,11 +1,11 @@
-use std::{collections::HashMap, sync::Arc};
+use std::sync::Arc;
#[cfg(feature = "smol")]
use futures_rustls::rustls;
#[cfg(feature = "tokio")]
use tokio_rustls::rustls;
-use karyon_core::{async_runtime::lock::Mutex, async_util::TaskGroup};
+use karyon_core::async_util::TaskGroup;
use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
#[cfg(feature = "ws")]
@@ -16,7 +16,7 @@ use crate::codec::WsJsonCodec;
use crate::{codec::JsonCodec, Error, Result, TcpConfig};
-use super::Client;
+use super::{Client, MessageDispatcher, Subscriber};
const DEFAULT_TIMEOUT: u64 = 3000; // 3s
@@ -171,8 +171,8 @@ impl ClientBuilder {
let client = Arc::new(Client {
timeout: self.timeout,
conn,
- chans: Mutex::new(HashMap::new()),
- subscriptions: Mutex::new(HashMap::new()),
+ message_dispatcher: MessageDispatcher::new(),
+ subscriber: Subscriber::new(),
task_group: TaskGroup::new(),
});
client.start_background_receiving();
diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs
new file mode 100644
index 0000000..a803f6e
--- /dev/null
+++ b/jsonrpc/src/client/message_dispatcher.rs
@@ -0,0 +1,54 @@
+use std::collections::HashMap;
+
+use async_channel::{Receiver, Sender};
+
+use karyon_core::async_runtime::lock::Mutex;
+
+use crate::{message, Error, Result};
+
+use super::RequestID;
+
+const CHANNEL_CAP: usize = 10;
+
+/// Manages client requests
+pub(super) struct MessageDispatcher {
+ chans: Mutex<HashMap<RequestID, Sender<message::Response>>>,
+}
+
+impl MessageDispatcher {
+ /// Creates a new MessageDispatcher
+ pub(super) fn new() -> Self {
+ Self {
+ chans: Mutex::new(HashMap::new()),
+ }
+ }
+
+ /// Registers a new request with a given ID and returns a Receiver channel
+ /// to wait for the response.
+ pub(super) async fn register(&self, id: RequestID) -> Receiver<message::Response> {
+ let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
+ self.chans.lock().await.insert(id, tx);
+ rx
+ }
+
+ /// Unregisters the request with the provided ID
+ pub(super) async fn unregister(&self, id: &RequestID) {
+ self.chans.lock().await.remove(id);
+ }
+
+ /// Dispatches a response to the channel associated with the response's ID.
+ ///
+ /// If a channel is registered for the response's ID, the response is sent
+ /// through that channel. If no channel is found for the ID, returns an error.
+ pub(super) async fn dispatch(&self, res: message::Response) -> Result<()> {
+ if res.id.is_none() {
+ return Err(Error::InvalidMsg("Response id is none"));
+ }
+ let id: RequestID = serde_json::from_value(res.id.clone().unwrap())?;
+ let val = self.chans.lock().await.remove(&id);
+ match val {
+ Some(tx) => tx.send(res).await.map_err(Error::from),
+ None => Err(Error::InvalidMsg("Receive unknown message")),
+ }
+ }
+}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 3a4505c..95354d3 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -1,13 +1,13 @@
pub mod builder;
+mod message_dispatcher;
+mod subscriber;
-use std::{collections::HashMap, sync::Arc, time::Duration};
-
-use log::{debug, error, warn};
+use log::{debug, error};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::json;
+use std::{sync::Arc, time::Duration};
use karyon_core::{
- async_runtime::lock::Mutex,
async_util::{timeout, TaskGroup, TaskResult},
util::random_32,
};
@@ -18,21 +18,19 @@ use crate::{
Error, Result,
};
-const CHANNEL_CAP: usize = 10;
+use message_dispatcher::MessageDispatcher;
+use subscriber::Subscriber;
+pub use subscriber::Subscription;
-/// Type alias for a subscription to receive notifications.
-///
-/// The receiver channel is returned by the `subscribe` method to receive
-/// notifications from the server.
-pub type Subscription = async_channel::Receiver<serde_json::Value>;
+type RequestID = u32;
/// Represents an RPC client
pub struct Client {
conn: Conn<serde_json::Value>,
timeout: Option<u64>,
- chans: Mutex<HashMap<u32, async_channel::Sender<message::Response>>>,
- subscriptions: Mutex<HashMap<SubscriptionID, async_channel::Sender<serde_json::Value>>>,
+ message_dispatcher: MessageDispatcher,
task_group: TaskGroup,
+ subscriber: Subscriber,
}
impl Client {
@@ -67,10 +65,9 @@ impl Client {
None => return Err(Error::InvalidMsg("Invalid subscription id")),
};
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
- self.subscriptions.lock().await.insert(sub_id, ch_tx);
+ let rx = self.subscriber.subscribe(sub_id).await;
- Ok((sub_id, ch_rx))
+ Ok((sub_id, rx))
}
/// Unsubscribes from the provided method, waits for the response, and returns the result.
@@ -79,7 +76,7 @@ impl Client {
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
let _ = self.send_request(method, sub_id).await?;
- self.subscriptions.lock().await.remove(&sub_id);
+ self.subscriber.unsubscribe(&sub_id).await;
Ok(())
}
@@ -88,7 +85,7 @@ impl Client {
method: &str,
params: T,
) -> Result<message::Response> {
- let id = random_32();
+ let id: RequestID = random_32();
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
id: json!(id),
@@ -98,16 +95,24 @@ impl Client {
let req_json = serde_json::to_value(&request)?;
+ // Send the json request
self.conn.send(req_json).await?;
- let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
- self.chans.lock().await.insert(id, tx);
+ // Register a new request
+ let rx = self.message_dispatcher.register(id).await;
+
+ // Wait for the message dispatcher to send the response
+ let result = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?,
+ None => rx.recv().await,
+ };
- let response = match self.wait_for_response(rx).await {
+ let response = match result {
Ok(r) => r,
Err(err) => {
- self.chans.lock().await.remove(&id);
- return Err(err);
+ // Unregister the request if an error occurs
+ self.message_dispatcher.unregister(&id).await;
+ return Err(err.into());
}
};
@@ -115,6 +120,8 @@ impl Client {
return Err(Error::SubscribeError(error.code, error.message));
}
+ // It should be OK to unwrap here, as the message dispatcher checks
+ // for the response id.
if *response.id.as_ref().unwrap() != request.id {
return Err(Error::InvalidMsg("Invalid response id"));
}
@@ -123,28 +130,17 @@ impl Client {
Ok(response)
}
- async fn wait_for_response(
- &self,
- rx: async_channel::Receiver<message::Response>,
- ) -> Result<message::Response> {
- match self.timeout {
- Some(t) => timeout(Duration::from_millis(t), rx.recv())
- .await?
- .map_err(Error::from),
- None => rx.recv().await.map_err(Error::from),
- }
- }
-
fn start_background_receiving(self: &Arc<Self>) {
let selfc = self.clone();
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("background receiving stopped: {err}");
}
- // drop all subscription channels
- selfc.subscriptions.lock().await.clear();
+ // Drop all subscription
+ selfc.subscriber.drop_all().await;
};
let selfc = self.clone();
+ // Spawn a new task for listing to new coming messages.
self.task_group.spawn(
async move {
loop {
@@ -157,48 +153,23 @@ impl Client {
}
}
},
- on_failure,
+ on_complete,
);
}
async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> {
+ // Check if the received message is of type Response
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
debug!("<-- {res}");
- if res.id.is_none() {
- return Err(Error::InvalidMsg("Response id is none"));
- }
-
- let id: u32 = serde_json::from_value(res.id.clone().unwrap())?;
- match self.chans.lock().await.remove(&id) {
- Some(tx) => tx.send(res).await?,
- None => return Err(Error::InvalidMsg("Receive unkown message")),
- }
-
+ self.message_dispatcher.dispatch(res).await?;
return Ok(());
}
+ // Check if the received message is of type Notification
if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
debug!("<-- {nt}");
- let sub_result: message::NotificationResult = match nt.params {
- Some(ref p) => serde_json::from_value(p.clone())?,
- None => return Err(Error::InvalidMsg("Invalid notification msg")),
- };
-
- match self
- .subscriptions
- .lock()
- .await
- .get(&sub_result.subscription)
- {
- Some(s) => {
- s.send(sub_result.result.unwrap_or(json!(""))).await?;
- return Ok(());
- }
- None => {
- warn!("Receive unknown notification {}", sub_result.subscription);
- return Ok(());
- }
- }
+ self.subscriber.notify(nt).await?;
+ return Ok(());
}
error!("Receive unexpected msg: {msg}");
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
new file mode 100644
index 0000000..d47cc2a
--- /dev/null
+++ b/jsonrpc/src/client/subscriber.rs
@@ -0,0 +1,64 @@
+use std::collections::HashMap;
+
+use async_channel::{Receiver, Sender};
+use log::warn;
+use serde_json::json;
+
+use karyon_core::async_runtime::lock::Mutex;
+
+use crate::{
+ message::{Notification, NotificationResult, SubscriptionID},
+ Error, Result,
+};
+
+const CHANNEL_CAP: usize = 10;
+
+/// Manages subscriptions for the client.
+pub(super) struct Subscriber {
+ subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
+}
+
+/// Type alias for a subscription to receive notifications.
+///
+/// The receiver channel is returned by the `subscribe`
+pub type Subscription = Receiver<serde_json::Value>;
+
+impl Subscriber {
+ pub(super) fn new() -> Self {
+ Self {
+ subs: Mutex::new(HashMap::new()),
+ }
+ }
+
+ pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ self.subs.lock().await.insert(id, ch_tx);
+ ch_rx
+ }
+
+ pub(super) async fn drop_all(&self) {
+ self.subs.lock().await.clear();
+ }
+
+ /// Unsubscribe
+ pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
+ self.subs.lock().await.remove(id);
+ }
+
+ pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
+ let nt_res: NotificationResult = match nt.params {
+ Some(ref p) => serde_json::from_value(p.clone())?,
+ None => return Err(Error::InvalidMsg("Invalid notification msg")),
+ };
+ match self.subs.lock().await.get(&nt_res.subscription) {
+ Some(s) => {
+ s.send(nt_res.result.unwrap_or(json!(""))).await?;
+ Ok(())
+ }
+ None => {
+ warn!("Receive unknown notification {}", nt_res.subscription);
+ Ok(())
+ }
+ }
+ }
+}
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index b7b632e..23a6e08 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -10,7 +10,7 @@ pub use client::{builder::ClientBuilder, Client};
pub use error::{Error, Result};
pub use server::{
builder::ServerBuilder,
- channel::{ArcChannel, Channel, Subscription},
+ channel::{Channel, Subscription},
pubsub_service::{PubSubRPCMethod, PubSubRPCService},
service::{RPCMethod, RPCService},
Server,
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index a9d1002..bb62b9f 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -4,8 +4,6 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32};
use crate::{message::SubscriptionID, Error, Result};
-pub type ArcChannel = Arc<Channel>;
-
pub(crate) struct NewNotification {
pub sub_id: SubscriptionID,
pub result: serde_json::Value,
@@ -52,7 +50,7 @@ impl Subscription {
}
}
-/// Represents a channel for creating/removing subscriptions
+/// Represents a connection channel for creating/removing subscriptions
pub struct Channel {
chan: async_channel::Sender<NewNotification>,
subs: Mutex<Vec<SubscriptionID>>,
@@ -60,7 +58,7 @@ pub struct Channel {
impl Channel {
/// Creates a new [`Channel`]
- pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
+ pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
Arc::new(Self {
chan,
subs: Mutex::new(Vec::new()),
@@ -75,7 +73,7 @@ impl Channel {
sub
}
- /// Removes a subscription
+ /// Removes a [`Subscription`]
pub async fn remove_subscription(&self, id: &SubscriptionID) {
let mut subs = self.subs.lock().await;
let i = match subs.iter().position(|i| i == id) {
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7136be4..09850c5 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -1,24 +1,20 @@
pub mod builder;
pub mod channel;
pub mod pubsub_service;
+mod response_queue;
pub mod service;
-use std::{
- collections::{HashMap, VecDeque},
- sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
use log::{debug, error, trace, warn};
-use karyon_core::{
- async_runtime::lock::Mutex,
- async_util::{select, CondVar, Either, TaskGroup, TaskResult},
-};
+use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
use karyon_net::{Conn, Endpoint, Listener};
use crate::{message, Error, PubSubRPCService, RPCService, Result};
-use channel::{ArcChannel, Channel};
+use channel::Channel;
+use response_queue::ResponseQueue;
const CHANNEL_CAP: usize = 10;
@@ -27,21 +23,6 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
pub const INTERNAL_ERROR_MSG: &str = "Internal error";
-fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
- let err = message::Error {
- code,
- message: msg.to_string(),
- data: None,
- };
-
- message::Response {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- error: Some(err),
- result: None,
- id,
- }
-}
-
struct NewRequest {
srvc_name: String,
method_name: String,
@@ -53,42 +34,6 @@ enum SanityCheckResult {
ErrRes(message::Response),
}
-struct ResponseQueue<T> {
- queue: Mutex<VecDeque<T>>,
- condvar: CondVar,
-}
-
-impl<T> ResponseQueue<T> {
- fn new() -> Arc<Self> {
- Arc::new(Self {
- queue: Mutex::new(VecDeque::new()),
- condvar: CondVar::new(),
- })
- }
-
- /// Wait while the queue is empty, remove and return the item from the queue,
- /// panicking if empty (shouldn't happen)
- async fn recv(&self) -> T {
- let mut queue = self.queue.lock().await;
-
- while queue.is_empty() {
- queue = self.condvar.wait(queue).await;
- }
-
- match queue.pop_front() {
- Some(v) => v,
- None => unreachable!(),
- }
- }
-
- /// Push an item into the queue, notify all waiting tasks that the
- /// condvar has changed
- async fn push(&self, res: T) {
- self.queue.lock().await.push_back(res);
- self.condvar.broadcast();
- }
-}
-
/// Represents an RPC server
pub struct Server {
listener: Listener<serde_json::Value>,
@@ -105,13 +50,14 @@ impl Server {
/// Starts the RPC server
pub async fn start(self: &Arc<Self>) {
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Accept loop stopped: {err}");
}
};
let selfc = self.clone();
+ // Spawns a new task for each new incoming connection
self.task_group.spawn(
async move {
loop {
@@ -127,7 +73,7 @@ impl Server {
}
}
},
- on_failure,
+ on_complete,
);
}
@@ -142,12 +88,14 @@ impl Server {
debug!("Handle a new connection {endpoint}");
let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ // Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
+ // Create a response queue
let queue = ResponseQueue::new();
let chan = channel.clone();
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
@@ -156,6 +104,8 @@ impl Server {
};
let queue_cloned = queue.clone();
+ // Start listing to new notifications coming from rpc services
+ // Push notifications as responses to the response queue
self.task_group.spawn(
async move {
loop {
@@ -164,30 +114,32 @@ impl Server {
subscription: nt.sub_id,
result: Some(nt.result),
}));
- let response = message::Notification {
+ let notification = message::Notification {
jsonrpc: message::JSONRPC_VERSION.to_string(),
method: nt.method,
params,
};
- debug!("--> {response}");
- queue_cloned.push(serde_json::json!(response)).await;
+ debug!("--> {notification}");
+ queue_cloned.push(serde_json::json!(notification)).await;
}
},
- on_failure,
+ on_complete,
);
let chan = channel.clone();
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
} else {
warn!("Connection {} dropped", endpoint);
}
- // close the subscription channel when the connection dropped
+ // Close the subscription channel when the connection dropped
chan.close();
};
let selfc = self.clone();
+ // Spawn a new task and wait for either a new response in the response
+ // queue or a new request coming from a connected client.
self.task_group.spawn(
async move {
loop {
@@ -197,11 +149,13 @@ impl Server {
.new_request(queue.clone(), channel.clone(), msg?)
.await;
}
- Either::Right(res) => conn.send(res).await?,
+ Either::Right(res) => {
+ conn.send(res).await?;
+ }
}
}
},
- on_failure,
+ on_complete,
);
Ok(())
@@ -220,6 +174,7 @@ impl Server {
};
debug!("<-- {rpc_msg}");
+ // Parse the service name and its method
let srvc_method_str = rpc_msg.method.clone();
let srvc_method: Vec<&str> = srvc_method_str.split('.').collect();
if srvc_method.len() < 2 {
@@ -240,20 +195,22 @@ impl Server {
})
}
- /// Spawns a new task for handling a new request
+ /// Spawns a new task for handling the new request
async fn new_request(
self: &Arc<Self>,
queue: Arc<ResponseQueue<serde_json::Value>>,
- channel: ArcChannel,
+ channel: Arc<Channel>,
msg: serde_json::Value,
) {
trace!("--> new request {msg}");
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Failed to handle a request: {err}");
}
};
let selfc = self.clone();
+ // Spawns a new task for handling the new request, and push the
+ // response to the response queue.
self.task_group.spawn(
async move {
let response = selfc.handle_request(channel, msg).await;
@@ -261,14 +218,15 @@ impl Server {
queue.push(serde_json::json!(response)).await;
Ok(())
},
- on_failure,
+ on_complete,
);
}
- /// Handles a new request
+ /// Handles the new request, and returns an RPC Response that has either
+ /// an error or result
async fn handle_request(
&self,
- channel: ArcChannel,
+ channel: Arc<Channel>,
msg: serde_json::Value,
) -> message::Response {
let req = match self.sanity_check(msg) {
@@ -283,7 +241,9 @@ impl Server {
id: Some(req.msg.id.clone()),
};
+ // Check if the service exists in pubsub services list
if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
+ // Check if the method exists within the service
if let Some(method) = service.get_pubsub_method(&req.method_name) {
let name = format!("{}.{}", service.name(), req.method_name);
let params = req.msg.params.unwrap_or(serde_json::json!(()));
@@ -296,7 +256,9 @@ impl Server {
}
}
+ // Check if the service exists in services list
if let Some(service) = self.services.get(&req.srvc_name) {
+ // Check if the method exists within the service
if let Some(method) = service.get_method(&req.method_name) {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
response.result = match method(params).await {
@@ -337,3 +299,18 @@ impl Server {
}
}
}
+
+fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
+ let err = message::Error {
+ code,
+ message: msg.to_string(),
+ data: None,
+ };
+
+ message::Response {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ error: Some(err),
+ result: None,
+ id,
+ }
+}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 5b3b50b..08d1bbb 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -1,12 +1,12 @@
-use std::{future::Future, pin::Pin};
+use std::{future::Future, pin::Pin, sync::Arc};
use crate::Result;
-use super::channel::ArcChannel;
+use super::channel::Channel;
/// Represents the RPC method
pub type PubSubRPCMethod<'a> =
- Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
+ Box<dyn Fn(Arc<Channel>, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
type PubSubRPCMethodOutput<'a> =
Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
@@ -51,7 +51,8 @@ macro_rules! impl_pubsub_rpc_service {
match name {
$(
stringify!($m) => {
- Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| {
+ Some(Box::new(
+ move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
Box::pin(self.$m(chan, method, params))
}))
}
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs
new file mode 100644
index 0000000..0d70503
--- /dev/null
+++ b/jsonrpc/src/server/response_queue.rs
@@ -0,0 +1,40 @@
+use std::{collections::VecDeque, sync::Arc};
+
+use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
+
+/// A queue for handling responses
+pub(super) struct ResponseQueue<T> {
+ queue: Mutex<VecDeque<T>>,
+ condvar: CondVar,
+}
+
+impl<T: std::fmt::Debug> ResponseQueue<T> {
+ pub(super) fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(VecDeque::new()),
+ condvar: CondVar::new(),
+ })
+ }
+
+ /// Wait while the queue is empty, remove and return the item from the queue,
+ /// panicking if empty (shouldn't happen)
+ pub(super) async fn recv(&self) -> T {
+ let mut queue = self.queue.lock().await;
+
+ while queue.is_empty() {
+ queue = self.condvar.wait(queue).await;
+ }
+
+ match queue.pop_front() {
+ Some(v) => v,
+ None => unreachable!(),
+ }
+ }
+
+ /// Push an item into the queue, notify all waiting tasks that the
+ /// condvar has changed
+ pub(super) async fn push(&self, res: T) {
+ self.queue.lock().await.push_back(res);
+ self.condvar.signal();
+ }
+}