aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r--jsonrpc/src/server/channel.rs8
-rw-r--r--jsonrpc/src/server/mod.rs131
-rw-r--r--jsonrpc/src/server/pubsub_service.rs9
-rw-r--r--jsonrpc/src/server/response_queue.rs40
4 files changed, 102 insertions, 86 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index a9d1002..bb62b9f 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -4,8 +4,6 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32};
use crate::{message::SubscriptionID, Error, Result};
-pub type ArcChannel = Arc<Channel>;
-
pub(crate) struct NewNotification {
pub sub_id: SubscriptionID,
pub result: serde_json::Value,
@@ -52,7 +50,7 @@ impl Subscription {
}
}
-/// Represents a channel for creating/removing subscriptions
+/// Represents a connection channel for creating/removing subscriptions
pub struct Channel {
chan: async_channel::Sender<NewNotification>,
subs: Mutex<Vec<SubscriptionID>>,
@@ -60,7 +58,7 @@ pub struct Channel {
impl Channel {
/// Creates a new [`Channel`]
- pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
+ pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
Arc::new(Self {
chan,
subs: Mutex::new(Vec::new()),
@@ -75,7 +73,7 @@ impl Channel {
sub
}
- /// Removes a subscription
+ /// Removes a [`Subscription`]
pub async fn remove_subscription(&self, id: &SubscriptionID) {
let mut subs = self.subs.lock().await;
let i = match subs.iter().position(|i| i == id) {
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7136be4..09850c5 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -1,24 +1,20 @@
pub mod builder;
pub mod channel;
pub mod pubsub_service;
+mod response_queue;
pub mod service;
-use std::{
- collections::{HashMap, VecDeque},
- sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
use log::{debug, error, trace, warn};
-use karyon_core::{
- async_runtime::lock::Mutex,
- async_util::{select, CondVar, Either, TaskGroup, TaskResult},
-};
+use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
use karyon_net::{Conn, Endpoint, Listener};
use crate::{message, Error, PubSubRPCService, RPCService, Result};
-use channel::{ArcChannel, Channel};
+use channel::Channel;
+use response_queue::ResponseQueue;
const CHANNEL_CAP: usize = 10;
@@ -27,21 +23,6 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
pub const INTERNAL_ERROR_MSG: &str = "Internal error";
-fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
- let err = message::Error {
- code,
- message: msg.to_string(),
- data: None,
- };
-
- message::Response {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- error: Some(err),
- result: None,
- id,
- }
-}
-
struct NewRequest {
srvc_name: String,
method_name: String,
@@ -53,42 +34,6 @@ enum SanityCheckResult {
ErrRes(message::Response),
}
-struct ResponseQueue<T> {
- queue: Mutex<VecDeque<T>>,
- condvar: CondVar,
-}
-
-impl<T> ResponseQueue<T> {
- fn new() -> Arc<Self> {
- Arc::new(Self {
- queue: Mutex::new(VecDeque::new()),
- condvar: CondVar::new(),
- })
- }
-
- /// Wait while the queue is empty, remove and return the item from the queue,
- /// panicking if empty (shouldn't happen)
- async fn recv(&self) -> T {
- let mut queue = self.queue.lock().await;
-
- while queue.is_empty() {
- queue = self.condvar.wait(queue).await;
- }
-
- match queue.pop_front() {
- Some(v) => v,
- None => unreachable!(),
- }
- }
-
- /// Push an item into the queue, notify all waiting tasks that the
- /// condvar has changed
- async fn push(&self, res: T) {
- self.queue.lock().await.push_back(res);
- self.condvar.broadcast();
- }
-}
-
/// Represents an RPC server
pub struct Server {
listener: Listener<serde_json::Value>,
@@ -105,13 +50,14 @@ impl Server {
/// Starts the RPC server
pub async fn start(self: &Arc<Self>) {
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Accept loop stopped: {err}");
}
};
let selfc = self.clone();
+ // Spawns a new task for each new incoming connection
self.task_group.spawn(
async move {
loop {
@@ -127,7 +73,7 @@ impl Server {
}
}
},
- on_failure,
+ on_complete,
);
}
@@ -142,12 +88,14 @@ impl Server {
debug!("Handle a new connection {endpoint}");
let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ // Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
+ // Create a response queue
let queue = ResponseQueue::new();
let chan = channel.clone();
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
@@ -156,6 +104,8 @@ impl Server {
};
let queue_cloned = queue.clone();
+ // Start listing to new notifications coming from rpc services
+ // Push notifications as responses to the response queue
self.task_group.spawn(
async move {
loop {
@@ -164,30 +114,32 @@ impl Server {
subscription: nt.sub_id,
result: Some(nt.result),
}));
- let response = message::Notification {
+ let notification = message::Notification {
jsonrpc: message::JSONRPC_VERSION.to_string(),
method: nt.method,
params,
};
- debug!("--> {response}");
- queue_cloned.push(serde_json::json!(response)).await;
+ debug!("--> {notification}");
+ queue_cloned.push(serde_json::json!(notification)).await;
}
},
- on_failure,
+ on_complete,
);
let chan = channel.clone();
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
} else {
warn!("Connection {} dropped", endpoint);
}
- // close the subscription channel when the connection dropped
+ // Close the subscription channel when the connection dropped
chan.close();
};
let selfc = self.clone();
+ // Spawn a new task and wait for either a new response in the response
+ // queue or a new request coming from a connected client.
self.task_group.spawn(
async move {
loop {
@@ -197,11 +149,13 @@ impl Server {
.new_request(queue.clone(), channel.clone(), msg?)
.await;
}
- Either::Right(res) => conn.send(res).await?,
+ Either::Right(res) => {
+ conn.send(res).await?;
+ }
}
}
},
- on_failure,
+ on_complete,
);
Ok(())
@@ -220,6 +174,7 @@ impl Server {
};
debug!("<-- {rpc_msg}");
+ // Parse the service name and its method
let srvc_method_str = rpc_msg.method.clone();
let srvc_method: Vec<&str> = srvc_method_str.split('.').collect();
if srvc_method.len() < 2 {
@@ -240,20 +195,22 @@ impl Server {
})
}
- /// Spawns a new task for handling a new request
+ /// Spawns a new task for handling the new request
async fn new_request(
self: &Arc<Self>,
queue: Arc<ResponseQueue<serde_json::Value>>,
- channel: ArcChannel,
+ channel: Arc<Channel>,
msg: serde_json::Value,
) {
trace!("--> new request {msg}");
- let on_failure = |result: TaskResult<Result<()>>| async move {
+ let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Failed to handle a request: {err}");
}
};
let selfc = self.clone();
+ // Spawns a new task for handling the new request, and push the
+ // response to the response queue.
self.task_group.spawn(
async move {
let response = selfc.handle_request(channel, msg).await;
@@ -261,14 +218,15 @@ impl Server {
queue.push(serde_json::json!(response)).await;
Ok(())
},
- on_failure,
+ on_complete,
);
}
- /// Handles a new request
+ /// Handles the new request, and returns an RPC Response that has either
+ /// an error or result
async fn handle_request(
&self,
- channel: ArcChannel,
+ channel: Arc<Channel>,
msg: serde_json::Value,
) -> message::Response {
let req = match self.sanity_check(msg) {
@@ -283,7 +241,9 @@ impl Server {
id: Some(req.msg.id.clone()),
};
+ // Check if the service exists in pubsub services list
if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
+ // Check if the method exists within the service
if let Some(method) = service.get_pubsub_method(&req.method_name) {
let name = format!("{}.{}", service.name(), req.method_name);
let params = req.msg.params.unwrap_or(serde_json::json!(()));
@@ -296,7 +256,9 @@ impl Server {
}
}
+ // Check if the service exists in services list
if let Some(service) = self.services.get(&req.srvc_name) {
+ // Check if the method exists within the service
if let Some(method) = service.get_method(&req.method_name) {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
response.result = match method(params).await {
@@ -337,3 +299,18 @@ impl Server {
}
}
}
+
+fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
+ let err = message::Error {
+ code,
+ message: msg.to_string(),
+ data: None,
+ };
+
+ message::Response {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ error: Some(err),
+ result: None,
+ id,
+ }
+}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 5b3b50b..08d1bbb 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -1,12 +1,12 @@
-use std::{future::Future, pin::Pin};
+use std::{future::Future, pin::Pin, sync::Arc};
use crate::Result;
-use super::channel::ArcChannel;
+use super::channel::Channel;
/// Represents the RPC method
pub type PubSubRPCMethod<'a> =
- Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
+ Box<dyn Fn(Arc<Channel>, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
type PubSubRPCMethodOutput<'a> =
Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
@@ -51,7 +51,8 @@ macro_rules! impl_pubsub_rpc_service {
match name {
$(
stringify!($m) => {
- Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| {
+ Some(Box::new(
+ move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
Box::pin(self.$m(chan, method, params))
}))
}
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs
new file mode 100644
index 0000000..0d70503
--- /dev/null
+++ b/jsonrpc/src/server/response_queue.rs
@@ -0,0 +1,40 @@
+use std::{collections::VecDeque, sync::Arc};
+
+use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
+
+/// A queue for handling responses
+pub(super) struct ResponseQueue<T> {
+ queue: Mutex<VecDeque<T>>,
+ condvar: CondVar,
+}
+
+impl<T: std::fmt::Debug> ResponseQueue<T> {
+ pub(super) fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(VecDeque::new()),
+ condvar: CondVar::new(),
+ })
+ }
+
+ /// Wait while the queue is empty, remove and return the item from the queue,
+ /// panicking if empty (shouldn't happen)
+ pub(super) async fn recv(&self) -> T {
+ let mut queue = self.queue.lock().await;
+
+ while queue.is_empty() {
+ queue = self.condvar.wait(queue).await;
+ }
+
+ match queue.pop_front() {
+ Some(v) => v,
+ None => unreachable!(),
+ }
+ }
+
+ /// Push an item into the queue, notify all waiting tasks that the
+ /// condvar has changed
+ pub(super) async fn push(&self, res: T) {
+ self.queue.lock().await.push_back(res);
+ self.condvar.signal();
+ }
+}