From 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@karyontech.net>
Date: Thu, 13 Jun 2024 05:52:48 +0200
Subject: jsonrpc: spread out comments and clean up

---
 jsonrpc/README.md                         |  19 ++++-
 jsonrpc/examples/pubsub_server.rs         |   8 +-
 jsonrpc/examples/tokio_server/Cargo.lock  |   8 +-
 jsonrpc/examples/tokio_server/src/main.rs |  12 ++-
 jsonrpc/src/client/builder.rs             |  10 +--
 jsonrpc/src/client/message_dispatcher.rs  |  54 ++++++++++++
 jsonrpc/src/client/mod.rs                 | 107 +++++++++---------------
 jsonrpc/src/client/subscriber.rs          |  64 +++++++++++++++
 jsonrpc/src/lib.rs                        |   2 +-
 jsonrpc/src/server/channel.rs             |   8 +-
 jsonrpc/src/server/mod.rs                 | 131 ++++++++++++------------------
 jsonrpc/src/server/pubsub_service.rs      |   9 +-
 jsonrpc/src/server/response_queue.rs      |  40 +++++++++
 13 files changed, 293 insertions(+), 179 deletions(-)
 create mode 100644 jsonrpc/src/client/message_dispatcher.rs
 create mode 100644 jsonrpc/src/client/subscriber.rs
 create mode 100644 jsonrpc/src/server/response_queue.rs

(limited to 'jsonrpc')

diff --git a/jsonrpc/README.md b/jsonrpc/README.md
index 5fc6847..b4ee580 100644
--- a/jsonrpc/README.md
+++ b/jsonrpc/README.md
@@ -13,6 +13,15 @@ features:
 - Allows passing an `async_executors::Executor` or tokio's `Runtime` when building
   the server.
 
+
+## Install 
+
+```bash
+    
+$ cargo add karyon_jsonrpc 
+
+```
+
 ## Example
 
 ```rust
@@ -23,7 +32,7 @@ use smol::stream::StreamExt;
 
 use karyon_jsonrpc::{
     Error, Server, Client, rpc_impl, rpc_pubsub_impl, message::SubscriptionID, 
-    ArcChannel
+    Channel
 };
 
 struct HelloWorld {}
@@ -46,7 +55,7 @@ impl HelloWorld {
 
 #[rpc_pubsub_impl]
 impl HelloWorld {
-    async fn log_subscribe(&self, chan: ArcChannel, method: String, _params: Value) -> Result<Value, Error> {
+    async fn log_subscribe(&self, chan: Arc<Channel>, method: String, _params: Value) -> Result<Value, Error> {
         let sub = chan.new_subscription(&method).await;
         let sub_id = sub.id.clone();
         smol::spawn(async move {
@@ -63,7 +72,7 @@ impl HelloWorld {
         Ok(serde_json::json!(sub_id))
     }
 
-    async fn log_unsubscribe(&self, chan: ArcChannel, method: String, params: Value) -> Result<Value, Error> {
+    async fn log_unsubscribe(&self, chan: Arc<Channel>, method: String, params: Value) -> Result<Value, Error> {
         let sub_id: SubscriptionID = serde_json::from_value(params)?;
         chan.remove_subscription(&sub_id).await;
         Ok(serde_json::json!(true))
@@ -126,6 +135,10 @@ async {
 
 ```
 
+## Supported Client Implementations 
 
+- [X] [Golang](https://github.com/karyontech/karyon-go)
+- [ ] Python 
+- [ ] JavaScript/TypeScript 
 
 
diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs
index bae5b37..74eb907 100644
--- a/jsonrpc/examples/pubsub_server.rs
+++ b/jsonrpc/examples/pubsub_server.rs
@@ -4,9 +4,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::Value;
 
 use karyon_core::async_util::sleep;
-use karyon_jsonrpc::{
-    message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server,
-};
+use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server};
 
 struct Calc {}
 
@@ -30,7 +28,7 @@ impl Calc {
 impl Calc {
     async fn log_subscribe(
         &self,
-        chan: ArcChannel,
+        chan: Arc<Channel>,
         method: String,
         _params: Value,
     ) -> Result<Value, Error> {
@@ -52,7 +50,7 @@ impl Calc {
 
     async fn log_unsubscribe(
         &self,
-        chan: ArcChannel,
+        chan: Arc<Channel>,
         _method: String,
         params: Value,
     ) -> Result<Value, Error> {
diff --git a/jsonrpc/examples/tokio_server/Cargo.lock b/jsonrpc/examples/tokio_server/Cargo.lock
index 16926fc..bd7d714 100644
--- a/jsonrpc/examples/tokio_server/Cargo.lock
+++ b/jsonrpc/examples/tokio_server/Cargo.lock
@@ -663,7 +663,7 @@ dependencies = [
 
 [[package]]
 name = "karyon_core"
-version = "0.1.0"
+version = "0.1.1"
 dependencies = [
  "async-channel",
  "bincode",
@@ -680,7 +680,7 @@ dependencies = [
 
 [[package]]
 name = "karyon_jsonrpc"
-version = "0.1.0"
+version = "0.1.1"
 dependencies = [
  "async-channel",
  "async-trait",
@@ -698,7 +698,7 @@ dependencies = [
 
 [[package]]
 name = "karyon_jsonrpc_macro"
-version = "0.1.0"
+version = "0.1.1"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -708,7 +708,7 @@ dependencies = [
 
 [[package]]
 name = "karyon_net"
-version = "0.1.0"
+version = "0.1.1"
 dependencies = [
  "async-channel",
  "async-trait",
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index 3bb4871..0a47fda 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration};
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 
-use karyon_jsonrpc::{
-    message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server,
-};
+use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Error, Server};
 
 struct Calc {
     version: String,
@@ -45,16 +43,16 @@ impl Calc {
 impl Calc {
     async fn log_subscribe(
         &self,
-        chan: ArcChannel,
+        chan: Arc<Channel>,
         method: String,
         _params: Value,
     ) -> Result<Value, Error> {
         let sub = chan.new_subscription(&method).await;
-        let sub_id = sub.id.clone();
+        let sub_id = sub.id;
         tokio::spawn(async move {
             loop {
                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
-                if let Err(_) = sub.notify(serde_json::json!("Hello")).await {
+                if sub.notify(serde_json::json!("Hello")).await.is_err() {
                     break;
                 }
             }
@@ -65,7 +63,7 @@ impl Calc {
 
     async fn log_unsubscribe(
         &self,
-        chan: ArcChannel,
+        chan: Arc<Channel>,
         _method: String,
         params: Value,
     ) -> Result<Value, Error> {
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs
index a287070..2263498 100644
--- a/jsonrpc/src/client/builder.rs
+++ b/jsonrpc/src/client/builder.rs
@@ -1,11 +1,11 @@
-use std::{collections::HashMap, sync::Arc};
+use std::sync::Arc;
 
 #[cfg(feature = "smol")]
 use futures_rustls::rustls;
 #[cfg(feature = "tokio")]
 use tokio_rustls::rustls;
 
-use karyon_core::{async_runtime::lock::Mutex, async_util::TaskGroup};
+use karyon_core::async_util::TaskGroup;
 use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
 
 #[cfg(feature = "ws")]
@@ -16,7 +16,7 @@ use crate::codec::WsJsonCodec;
 
 use crate::{codec::JsonCodec, Error, Result, TcpConfig};
 
-use super::Client;
+use super::{Client, MessageDispatcher, Subscriber};
 
 const DEFAULT_TIMEOUT: u64 = 3000; // 3s
 
@@ -171,8 +171,8 @@ impl ClientBuilder {
         let client = Arc::new(Client {
             timeout: self.timeout,
             conn,
-            chans: Mutex::new(HashMap::new()),
-            subscriptions: Mutex::new(HashMap::new()),
+            message_dispatcher: MessageDispatcher::new(),
+            subscriber: Subscriber::new(),
             task_group: TaskGroup::new(),
         });
         client.start_background_receiving();
diff --git a/jsonrpc/src/client/message_dispatcher.rs b/jsonrpc/src/client/message_dispatcher.rs
new file mode 100644
index 0000000..a803f6e
--- /dev/null
+++ b/jsonrpc/src/client/message_dispatcher.rs
@@ -0,0 +1,54 @@
+use std::collections::HashMap;
+
+use async_channel::{Receiver, Sender};
+
+use karyon_core::async_runtime::lock::Mutex;
+
+use crate::{message, Error, Result};
+
+use super::RequestID;
+
+const CHANNEL_CAP: usize = 10;
+
+/// Manages client requests
+pub(super) struct MessageDispatcher {
+    chans: Mutex<HashMap<RequestID, Sender<message::Response>>>,
+}
+
+impl MessageDispatcher {
+    /// Creates a new MessageDispatcher
+    pub(super) fn new() -> Self {
+        Self {
+            chans: Mutex::new(HashMap::new()),
+        }
+    }
+
+    /// Registers a new request with a given ID and returns a Receiver channel
+    /// to wait for the response.
+    pub(super) async fn register(&self, id: RequestID) -> Receiver<message::Response> {
+        let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
+        self.chans.lock().await.insert(id, tx);
+        rx
+    }
+
+    /// Unregisters the request with the provided ID
+    pub(super) async fn unregister(&self, id: &RequestID) {
+        self.chans.lock().await.remove(id);
+    }
+
+    /// Dispatches a response to the channel associated with the response's ID.
+    ///
+    /// If a channel is registered for the response's ID, the response is sent
+    /// through that channel. If no channel is found for the ID, returns an error.
+    pub(super) async fn dispatch(&self, res: message::Response) -> Result<()> {
+        if res.id.is_none() {
+            return Err(Error::InvalidMsg("Response id is none"));
+        }
+        let id: RequestID = serde_json::from_value(res.id.clone().unwrap())?;
+        let val = self.chans.lock().await.remove(&id);
+        match val {
+            Some(tx) => tx.send(res).await.map_err(Error::from),
+            None => Err(Error::InvalidMsg("Receive unknown message")),
+        }
+    }
+}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 3a4505c..95354d3 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -1,13 +1,13 @@
 pub mod builder;
+mod message_dispatcher;
+mod subscriber;
 
-use std::{collections::HashMap, sync::Arc, time::Duration};
-
-use log::{debug, error, warn};
+use log::{debug, error};
 use serde::{de::DeserializeOwned, Serialize};
 use serde_json::json;
+use std::{sync::Arc, time::Duration};
 
 use karyon_core::{
-    async_runtime::lock::Mutex,
     async_util::{timeout, TaskGroup, TaskResult},
     util::random_32,
 };
@@ -18,21 +18,19 @@ use crate::{
     Error, Result,
 };
 
-const CHANNEL_CAP: usize = 10;
+use message_dispatcher::MessageDispatcher;
+use subscriber::Subscriber;
+pub use subscriber::Subscription;
 
-/// Type alias for a subscription to receive notifications.
-///
-/// The receiver channel is returned by the `subscribe` method to receive
-/// notifications from the server.
-pub type Subscription = async_channel::Receiver<serde_json::Value>;
+type RequestID = u32;
 
 /// Represents an RPC client
 pub struct Client {
     conn: Conn<serde_json::Value>,
     timeout: Option<u64>,
-    chans: Mutex<HashMap<u32, async_channel::Sender<message::Response>>>,
-    subscriptions: Mutex<HashMap<SubscriptionID, async_channel::Sender<serde_json::Value>>>,
+    message_dispatcher: MessageDispatcher,
     task_group: TaskGroup,
+    subscriber: Subscriber,
 }
 
 impl Client {
@@ -67,10 +65,9 @@ impl Client {
             None => return Err(Error::InvalidMsg("Invalid subscription id")),
         };
 
-        let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
-        self.subscriptions.lock().await.insert(sub_id, ch_tx);
+        let rx = self.subscriber.subscribe(sub_id).await;
 
-        Ok((sub_id, ch_rx))
+        Ok((sub_id, rx))
     }
 
     /// Unsubscribes from the provided method, waits for the response, and returns the result.
@@ -79,7 +76,7 @@ impl Client {
     /// and subscription ID. It waits for the response to confirm the unsubscription.
     pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
         let _ = self.send_request(method, sub_id).await?;
-        self.subscriptions.lock().await.remove(&sub_id);
+        self.subscriber.unsubscribe(&sub_id).await;
         Ok(())
     }
 
@@ -88,7 +85,7 @@ impl Client {
         method: &str,
         params: T,
     ) -> Result<message::Response> {
-        let id = random_32();
+        let id: RequestID = random_32();
         let request = message::Request {
             jsonrpc: message::JSONRPC_VERSION.to_string(),
             id: json!(id),
@@ -98,16 +95,24 @@ impl Client {
 
         let req_json = serde_json::to_value(&request)?;
 
+        // Send the json request
         self.conn.send(req_json).await?;
 
-        let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
-        self.chans.lock().await.insert(id, tx);
+        // Register a new request
+        let rx = self.message_dispatcher.register(id).await;
+
+        // Wait for the message dispatcher to send the response
+        let result = match self.timeout {
+            Some(t) => timeout(Duration::from_millis(t), rx.recv()).await?,
+            None => rx.recv().await,
+        };
 
-        let response = match self.wait_for_response(rx).await {
+        let response = match result {
             Ok(r) => r,
             Err(err) => {
-                self.chans.lock().await.remove(&id);
-                return Err(err);
+                // Unregister the request if an error occurs
+                self.message_dispatcher.unregister(&id).await;
+                return Err(err.into());
             }
         };
 
@@ -115,6 +120,8 @@ impl Client {
             return Err(Error::SubscribeError(error.code, error.message));
         }
 
+        // It should be OK to unwrap here, as the message dispatcher checks
+        // for the response id.
         if *response.id.as_ref().unwrap() != request.id {
             return Err(Error::InvalidMsg("Invalid response id"));
         }
@@ -123,28 +130,17 @@ impl Client {
         Ok(response)
     }
 
-    async fn wait_for_response(
-        &self,
-        rx: async_channel::Receiver<message::Response>,
-    ) -> Result<message::Response> {
-        match self.timeout {
-            Some(t) => timeout(Duration::from_millis(t), rx.recv())
-                .await?
-                .map_err(Error::from),
-            None => rx.recv().await.map_err(Error::from),
-        }
-    }
-
     fn start_background_receiving(self: &Arc<Self>) {
         let selfc = self.clone();
-        let on_failure = |result: TaskResult<Result<()>>| async move {
+        let on_complete = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 error!("background receiving stopped: {err}");
             }
-            // drop all subscription channels
-            selfc.subscriptions.lock().await.clear();
+            // Drop all subscription
+            selfc.subscriber.drop_all().await;
         };
         let selfc = self.clone();
+        // Spawn a new task for listing to new coming messages.
         self.task_group.spawn(
             async move {
                 loop {
@@ -157,48 +153,23 @@ impl Client {
                     }
                 }
             },
-            on_failure,
+            on_complete,
         );
     }
 
     async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> {
+        // Check if the received message is of type Response
         if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
             debug!("<-- {res}");
-            if res.id.is_none() {
-                return Err(Error::InvalidMsg("Response id is none"));
-            }
-
-            let id: u32 = serde_json::from_value(res.id.clone().unwrap())?;
-            match self.chans.lock().await.remove(&id) {
-                Some(tx) => tx.send(res).await?,
-                None => return Err(Error::InvalidMsg("Receive unkown message")),
-            }
-
+            self.message_dispatcher.dispatch(res).await?;
             return Ok(());
         }
 
+        // Check if the received message is of type Notification
         if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
             debug!("<-- {nt}");
-            let sub_result: message::NotificationResult = match nt.params {
-                Some(ref p) => serde_json::from_value(p.clone())?,
-                None => return Err(Error::InvalidMsg("Invalid notification msg")),
-            };
-
-            match self
-                .subscriptions
-                .lock()
-                .await
-                .get(&sub_result.subscription)
-            {
-                Some(s) => {
-                    s.send(sub_result.result.unwrap_or(json!(""))).await?;
-                    return Ok(());
-                }
-                None => {
-                    warn!("Receive unknown notification {}", sub_result.subscription);
-                    return Ok(());
-                }
-            }
+            self.subscriber.notify(nt).await?;
+            return Ok(());
         }
 
         error!("Receive unexpected msg: {msg}");
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
new file mode 100644
index 0000000..d47cc2a
--- /dev/null
+++ b/jsonrpc/src/client/subscriber.rs
@@ -0,0 +1,64 @@
+use std::collections::HashMap;
+
+use async_channel::{Receiver, Sender};
+use log::warn;
+use serde_json::json;
+
+use karyon_core::async_runtime::lock::Mutex;
+
+use crate::{
+    message::{Notification, NotificationResult, SubscriptionID},
+    Error, Result,
+};
+
+const CHANNEL_CAP: usize = 10;
+
+/// Manages subscriptions for the client.
+pub(super) struct Subscriber {
+    subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
+}
+
+/// Type alias for a subscription to receive notifications.
+///
+/// The receiver channel is returned by the `subscribe`
+pub type Subscription = Receiver<serde_json::Value>;
+
+impl Subscriber {
+    pub(super) fn new() -> Self {
+        Self {
+            subs: Mutex::new(HashMap::new()),
+        }
+    }
+
+    pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
+        let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+        self.subs.lock().await.insert(id, ch_tx);
+        ch_rx
+    }
+
+    pub(super) async fn drop_all(&self) {
+        self.subs.lock().await.clear();
+    }
+
+    /// Unsubscribe
+    pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
+        self.subs.lock().await.remove(id);
+    }
+
+    pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
+        let nt_res: NotificationResult = match nt.params {
+            Some(ref p) => serde_json::from_value(p.clone())?,
+            None => return Err(Error::InvalidMsg("Invalid notification msg")),
+        };
+        match self.subs.lock().await.get(&nt_res.subscription) {
+            Some(s) => {
+                s.send(nt_res.result.unwrap_or(json!(""))).await?;
+                Ok(())
+            }
+            None => {
+                warn!("Receive unknown notification {}", nt_res.subscription);
+                Ok(())
+            }
+        }
+    }
+}
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index b7b632e..23a6e08 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -10,7 +10,7 @@ pub use client::{builder::ClientBuilder, Client};
 pub use error::{Error, Result};
 pub use server::{
     builder::ServerBuilder,
-    channel::{ArcChannel, Channel, Subscription},
+    channel::{Channel, Subscription},
     pubsub_service::{PubSubRPCMethod, PubSubRPCService},
     service::{RPCMethod, RPCService},
     Server,
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index a9d1002..bb62b9f 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -4,8 +4,6 @@ use karyon_core::{async_runtime::lock::Mutex, util::random_32};
 
 use crate::{message::SubscriptionID, Error, Result};
 
-pub type ArcChannel = Arc<Channel>;
-
 pub(crate) struct NewNotification {
     pub sub_id: SubscriptionID,
     pub result: serde_json::Value,
@@ -52,7 +50,7 @@ impl Subscription {
     }
 }
 
-/// Represents a channel for creating/removing subscriptions
+/// Represents a connection channel for creating/removing subscriptions
 pub struct Channel {
     chan: async_channel::Sender<NewNotification>,
     subs: Mutex<Vec<SubscriptionID>>,
@@ -60,7 +58,7 @@ pub struct Channel {
 
 impl Channel {
     /// Creates a new [`Channel`]
-    pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
+    pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
         Arc::new(Self {
             chan,
             subs: Mutex::new(Vec::new()),
@@ -75,7 +73,7 @@ impl Channel {
         sub
     }
 
-    /// Removes a subscription
+    /// Removes a [`Subscription`]
     pub async fn remove_subscription(&self, id: &SubscriptionID) {
         let mut subs = self.subs.lock().await;
         let i = match subs.iter().position(|i| i == id) {
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 7136be4..09850c5 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -1,24 +1,20 @@
 pub mod builder;
 pub mod channel;
 pub mod pubsub_service;
+mod response_queue;
 pub mod service;
 
-use std::{
-    collections::{HashMap, VecDeque},
-    sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
 
 use log::{debug, error, trace, warn};
 
-use karyon_core::{
-    async_runtime::lock::Mutex,
-    async_util::{select, CondVar, Either, TaskGroup, TaskResult},
-};
+use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
 use karyon_net::{Conn, Endpoint, Listener};
 
 use crate::{message, Error, PubSubRPCService, RPCService, Result};
 
-use channel::{ArcChannel, Channel};
+use channel::Channel;
+use response_queue::ResponseQueue;
 
 const CHANNEL_CAP: usize = 10;
 
@@ -27,21 +23,6 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
 pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
 pub const INTERNAL_ERROR_MSG: &str = "Internal error";
 
-fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
-    let err = message::Error {
-        code,
-        message: msg.to_string(),
-        data: None,
-    };
-
-    message::Response {
-        jsonrpc: message::JSONRPC_VERSION.to_string(),
-        error: Some(err),
-        result: None,
-        id,
-    }
-}
-
 struct NewRequest {
     srvc_name: String,
     method_name: String,
@@ -53,42 +34,6 @@ enum SanityCheckResult {
     ErrRes(message::Response),
 }
 
-struct ResponseQueue<T> {
-    queue: Mutex<VecDeque<T>>,
-    condvar: CondVar,
-}
-
-impl<T> ResponseQueue<T> {
-    fn new() -> Arc<Self> {
-        Arc::new(Self {
-            queue: Mutex::new(VecDeque::new()),
-            condvar: CondVar::new(),
-        })
-    }
-
-    /// Wait while the queue is empty, remove and return the item from the queue,
-    /// panicking if empty (shouldn't happen)
-    async fn recv(&self) -> T {
-        let mut queue = self.queue.lock().await;
-
-        while queue.is_empty() {
-            queue = self.condvar.wait(queue).await;
-        }
-
-        match queue.pop_front() {
-            Some(v) => v,
-            None => unreachable!(),
-        }
-    }
-
-    /// Push an item into the queue, notify all waiting tasks that the
-    /// condvar has changed
-    async fn push(&self, res: T) {
-        self.queue.lock().await.push_back(res);
-        self.condvar.broadcast();
-    }
-}
-
 /// Represents an RPC server
 pub struct Server {
     listener: Listener<serde_json::Value>,
@@ -105,13 +50,14 @@ impl Server {
 
     /// Starts the RPC server
     pub async fn start(self: &Arc<Self>) {
-        let on_failure = |result: TaskResult<Result<()>>| async move {
+        let on_complete = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 error!("Accept loop stopped: {err}");
             }
         };
 
         let selfc = self.clone();
+        // Spawns a new task for each new incoming connection
         self.task_group.spawn(
             async move {
                 loop {
@@ -127,7 +73,7 @@ impl Server {
                     }
                 }
             },
-            on_failure,
+            on_complete,
         );
     }
 
@@ -142,12 +88,14 @@ impl Server {
         debug!("Handle a new connection {endpoint}");
 
         let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+        // Create a new connection channel for managing subscriptions
         let channel = Channel::new(ch_tx);
 
+        // Create a response queue
         let queue = ResponseQueue::new();
 
         let chan = channel.clone();
-        let on_failure = |result: TaskResult<Result<()>>| async move {
+        let on_complete = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 debug!("Notification loop stopped: {err}");
             }
@@ -156,6 +104,8 @@ impl Server {
         };
 
         let queue_cloned = queue.clone();
+        // Start listing to new notifications coming from rpc services
+        // Push notifications as responses to the response queue
         self.task_group.spawn(
             async move {
                 loop {
@@ -164,30 +114,32 @@ impl Server {
                         subscription: nt.sub_id,
                         result: Some(nt.result),
                     }));
-                    let response = message::Notification {
+                    let notification = message::Notification {
                         jsonrpc: message::JSONRPC_VERSION.to_string(),
                         method: nt.method,
                         params,
                     };
-                    debug!("--> {response}");
-                    queue_cloned.push(serde_json::json!(response)).await;
+                    debug!("--> {notification}");
+                    queue_cloned.push(serde_json::json!(notification)).await;
                 }
             },
-            on_failure,
+            on_complete,
         );
 
         let chan = channel.clone();
-        let on_failure = |result: TaskResult<Result<()>>| async move {
+        let on_complete = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 error!("Connection {} dropped: {}", endpoint, err);
             } else {
                 warn!("Connection {} dropped", endpoint);
             }
-            // close the subscription channel when the connection dropped
+            // Close the subscription channel when the connection dropped
             chan.close();
         };
 
         let selfc = self.clone();
+        // Spawn a new task and wait for either a new response in the response
+        // queue or a new request coming from a connected client.
         self.task_group.spawn(
             async move {
                 loop {
@@ -197,11 +149,13 @@ impl Server {
                                 .new_request(queue.clone(), channel.clone(), msg?)
                                 .await;
                         }
-                        Either::Right(res) => conn.send(res).await?,
+                        Either::Right(res) => {
+                            conn.send(res).await?;
+                        }
                     }
                 }
             },
-            on_failure,
+            on_complete,
         );
 
         Ok(())
@@ -220,6 +174,7 @@ impl Server {
         };
         debug!("<-- {rpc_msg}");
 
+        // Parse the service name and its method
         let srvc_method_str = rpc_msg.method.clone();
         let srvc_method: Vec<&str> = srvc_method_str.split('.').collect();
         if srvc_method.len() < 2 {
@@ -240,20 +195,22 @@ impl Server {
         })
     }
 
-    /// Spawns a new task for handling a new request
+    /// Spawns a new task for handling the new request
     async fn new_request(
         self: &Arc<Self>,
         queue: Arc<ResponseQueue<serde_json::Value>>,
-        channel: ArcChannel,
+        channel: Arc<Channel>,
         msg: serde_json::Value,
     ) {
         trace!("--> new request {msg}");
-        let on_failure = |result: TaskResult<Result<()>>| async move {
+        let on_complete = |result: TaskResult<Result<()>>| async move {
             if let TaskResult::Completed(Err(err)) = result {
                 error!("Failed to handle a request: {err}");
             }
         };
         let selfc = self.clone();
+        // Spawns a new task for handling the new request, and push the
+        // response to the response queue.
         self.task_group.spawn(
             async move {
                 let response = selfc.handle_request(channel, msg).await;
@@ -261,14 +218,15 @@ impl Server {
                 queue.push(serde_json::json!(response)).await;
                 Ok(())
             },
-            on_failure,
+            on_complete,
         );
     }
 
-    /// Handles a new request
+    /// Handles the new request, and returns an RPC Response that has either
+    /// an error or result
     async fn handle_request(
         &self,
-        channel: ArcChannel,
+        channel: Arc<Channel>,
         msg: serde_json::Value,
     ) -> message::Response {
         let req = match self.sanity_check(msg) {
@@ -283,7 +241,9 @@ impl Server {
             id: Some(req.msg.id.clone()),
         };
 
+        // Check if the service exists in pubsub services list
         if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
+            // Check if the method exists within the service
             if let Some(method) = service.get_pubsub_method(&req.method_name) {
                 let name = format!("{}.{}", service.name(), req.method_name);
                 let params = req.msg.params.unwrap_or(serde_json::json!(()));
@@ -296,7 +256,9 @@ impl Server {
             }
         }
 
+        // Check if the service exists in services list
         if let Some(service) = self.services.get(&req.srvc_name) {
+            // Check if the method exists within the service
             if let Some(method) = service.get_method(&req.method_name) {
                 let params = req.msg.params.unwrap_or(serde_json::json!(()));
                 response.result = match method(params).await {
@@ -337,3 +299,18 @@ impl Server {
         }
     }
 }
+
+fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message::Response {
+    let err = message::Error {
+        code,
+        message: msg.to_string(),
+        data: None,
+    };
+
+    message::Response {
+        jsonrpc: message::JSONRPC_VERSION.to_string(),
+        error: Some(err),
+        result: None,
+        id,
+    }
+}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 5b3b50b..08d1bbb 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -1,12 +1,12 @@
-use std::{future::Future, pin::Pin};
+use std::{future::Future, pin::Pin, sync::Arc};
 
 use crate::Result;
 
-use super::channel::ArcChannel;
+use super::channel::Channel;
 
 /// Represents the RPC method
 pub type PubSubRPCMethod<'a> =
-    Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
+    Box<dyn Fn(Arc<Channel>, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
 type PubSubRPCMethodOutput<'a> =
     Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
 
@@ -51,7 +51,8 @@ macro_rules! impl_pubsub_rpc_service {
                 match name {
                 $(
                     stringify!($m) => {
-                        Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| {
+                        Some(Box::new(
+                            move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
                             Box::pin(self.$m(chan, method, params))
                         }))
                     }
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs
new file mode 100644
index 0000000..0d70503
--- /dev/null
+++ b/jsonrpc/src/server/response_queue.rs
@@ -0,0 +1,40 @@
+use std::{collections::VecDeque, sync::Arc};
+
+use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
+
+/// A queue for handling responses
+pub(super) struct ResponseQueue<T> {
+    queue: Mutex<VecDeque<T>>,
+    condvar: CondVar,
+}
+
+impl<T: std::fmt::Debug> ResponseQueue<T> {
+    pub(super) fn new() -> Arc<Self> {
+        Arc::new(Self {
+            queue: Mutex::new(VecDeque::new()),
+            condvar: CondVar::new(),
+        })
+    }
+
+    /// Wait while the queue is empty, remove and return the item from the queue,
+    /// panicking if empty (shouldn't happen)
+    pub(super) async fn recv(&self) -> T {
+        let mut queue = self.queue.lock().await;
+
+        while queue.is_empty() {
+            queue = self.condvar.wait(queue).await;
+        }
+
+        match queue.pop_front() {
+            Some(v) => v,
+            None => unreachable!(),
+        }
+    }
+
+    /// Push an item into the queue, notify all waiting tasks that the
+    /// condvar has changed
+    pub(super) async fn push(&self, res: T) {
+        self.queue.lock().await.push_back(res);
+        self.condvar.signal();
+    }
+}
-- 
cgit v1.2.3