From 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 13 Jun 2024 05:52:48 +0200 Subject: jsonrpc: spread out comments and clean up --- jsonrpc/src/server/mod.rs | 131 +++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 77 deletions(-) (limited to 'jsonrpc/src/server/mod.rs') diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7136be4..09850c5 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -1,24 +1,20 @@ pub mod builder; pub mod channel; pub mod pubsub_service; +mod response_queue; pub mod service; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use log::{debug, error, trace, warn}; -use karyon_core::{ - async_runtime::lock::Mutex, - async_util::{select, CondVar, Either, TaskGroup, TaskResult}, -}; +use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; use karyon_net::{Conn, Endpoint, Listener}; use crate::{message, Error, PubSubRPCService, RPCService, Result}; -use channel::{ArcChannel, Channel}; +use channel::Channel; +use response_queue::ResponseQueue; const CHANNEL_CAP: usize = 10; @@ -27,21 +23,6 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; pub const INTERNAL_ERROR_MSG: &str = "Internal error"; -fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { - let err = message::Error { - code, - message: msg.to_string(), - data: None, - }; - - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: Some(err), - result: None, - id, - } -} - struct NewRequest { srvc_name: String, method_name: String, @@ -53,42 +34,6 @@ enum SanityCheckResult { ErrRes(message::Response), } -struct ResponseQueue { - queue: Mutex>, - condvar: CondVar, -} - -impl ResponseQueue { - fn new() -> Arc { - Arc::new(Self { - queue: Mutex::new(VecDeque::new()), - condvar: CondVar::new(), - }) - } - - /// Wait while the queue is empty, remove and return the item from the queue, - /// panicking if empty (shouldn't happen) - async fn recv(&self) -> T { - let mut queue = self.queue.lock().await; - - while queue.is_empty() { - queue = self.condvar.wait(queue).await; - } - - match queue.pop_front() { - Some(v) => v, - None => unreachable!(), - } - } - - /// Push an item into the queue, notify all waiting tasks that the - /// condvar has changed - async fn push(&self, res: T) { - self.queue.lock().await.push_back(res); - self.condvar.broadcast(); - } -} - /// Represents an RPC server pub struct Server { listener: Listener, @@ -105,13 +50,14 @@ impl Server { /// Starts the RPC server pub async fn start(self: &Arc) { - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Accept loop stopped: {err}"); } }; let selfc = self.clone(); + // Spawns a new task for each new incoming connection self.task_group.spawn( async move { loop { @@ -127,7 +73,7 @@ impl Server { } } }, - on_failure, + on_complete, ); } @@ -142,12 +88,14 @@ impl Server { debug!("Handle a new connection {endpoint}"); let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); + // Create a response queue let queue = ResponseQueue::new(); let chan = channel.clone(); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } @@ -156,6 +104,8 @@ impl Server { }; let queue_cloned = queue.clone(); + // Start listing to new notifications coming from rpc services + // Push notifications as responses to the response queue self.task_group.spawn( async move { loop { @@ -164,30 +114,32 @@ impl Server { subscription: nt.sub_id, result: Some(nt.result), })); - let response = message::Notification { + let notification = message::Notification { jsonrpc: message::JSONRPC_VERSION.to_string(), method: nt.method, params, }; - debug!("--> {response}"); - queue_cloned.push(serde_json::json!(response)).await; + debug!("--> {notification}"); + queue_cloned.push(serde_json::json!(notification)).await; } }, - on_failure, + on_complete, ); let chan = channel.clone(); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); } else { warn!("Connection {} dropped", endpoint); } - // close the subscription channel when the connection dropped + // Close the subscription channel when the connection dropped chan.close(); }; let selfc = self.clone(); + // Spawn a new task and wait for either a new response in the response + // queue or a new request coming from a connected client. self.task_group.spawn( async move { loop { @@ -197,11 +149,13 @@ impl Server { .new_request(queue.clone(), channel.clone(), msg?) .await; } - Either::Right(res) => conn.send(res).await?, + Either::Right(res) => { + conn.send(res).await?; + } } } }, - on_failure, + on_complete, ); Ok(()) @@ -220,6 +174,7 @@ impl Server { }; debug!("<-- {rpc_msg}"); + // Parse the service name and its method let srvc_method_str = rpc_msg.method.clone(); let srvc_method: Vec<&str> = srvc_method_str.split('.').collect(); if srvc_method.len() < 2 { @@ -240,20 +195,22 @@ impl Server { }) } - /// Spawns a new task for handling a new request + /// Spawns a new task for handling the new request async fn new_request( self: &Arc, queue: Arc>, - channel: ArcChannel, + channel: Arc, msg: serde_json::Value, ) { trace!("--> new request {msg}"); - let on_failure = |result: TaskResult>| async move { + let on_complete = |result: TaskResult>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Failed to handle a request: {err}"); } }; let selfc = self.clone(); + // Spawns a new task for handling the new request, and push the + // response to the response queue. self.task_group.spawn( async move { let response = selfc.handle_request(channel, msg).await; @@ -261,14 +218,15 @@ impl Server { queue.push(serde_json::json!(response)).await; Ok(()) }, - on_failure, + on_complete, ); } - /// Handles a new request + /// Handles the new request, and returns an RPC Response that has either + /// an error or result async fn handle_request( &self, - channel: ArcChannel, + channel: Arc, msg: serde_json::Value, ) -> message::Response { let req = match self.sanity_check(msg) { @@ -283,7 +241,9 @@ impl Server { id: Some(req.msg.id.clone()), }; + // Check if the service exists in pubsub services list if let Some(service) = self.pubsub_services.get(&req.srvc_name) { + // Check if the method exists within the service if let Some(method) = service.get_pubsub_method(&req.method_name) { let name = format!("{}.{}", service.name(), req.method_name); let params = req.msg.params.unwrap_or(serde_json::json!(())); @@ -296,7 +256,9 @@ impl Server { } } + // Check if the service exists in services list if let Some(service) = self.services.get(&req.srvc_name) { + // Check if the method exists within the service if let Some(method) = service.get_method(&req.method_name) { let params = req.msg.params.unwrap_or(serde_json::json!(())); response.result = match method(params).await { @@ -337,3 +299,18 @@ impl Server { } } } + +fn pack_err_res(code: i32, msg: &str, id: Option) -> message::Response { + let err = message::Error { + code, + message: msg.to_string(), + data: None, + }; + + message::Response { + jsonrpc: message::JSONRPC_VERSION.to_string(), + error: Some(err), + result: None, + id, + } +} -- cgit v1.2.3