From 3c55168b72c022b618822c7993b7692f583506db Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 30 Jun 2024 20:03:02 +0200 Subject: jsonrpc: remove redundant macro codes in the main crate and clean up internal proc macros --- jsonrpc/src/server/channel.rs | 5 ++-- jsonrpc/src/server/mod.rs | 27 +++++++++++------- jsonrpc/src/server/pubsub_service.rs | 53 ------------------------------------ jsonrpc/src/server/service.rs | 50 ---------------------------------- 4 files changed, 20 insertions(+), 115 deletions(-) (limited to 'jsonrpc/src/server') diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 36896b4..7eff66f 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -52,7 +52,7 @@ impl Subscription { } /// Checks from the partent if this subscription is still subscribed - pub async fn still_subscribed(&self) -> bool { + async fn still_subscribed(&self) -> bool { match self.parent.upgrade() { Some(parent) => parent.subs.lock().await.contains(&self.id), None => false, @@ -93,7 +93,8 @@ impl Channel { subs.remove(i); } - pub fn close(&self) { + /// Closes the [`Channel`] + pub(crate) fn close(&self) { self.chan.close(); } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index e2c3e3a..50a4a8e 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -64,20 +64,21 @@ pub struct Server { } impl Server { - /// Returns the local endpoint. - pub fn local_endpoint(&self) -> Result { - self.listener.local_endpoint().map_err(Error::from) - } - - /// Starts the RPC server + /// Starts the RPC server. This will spawn a new task for the main accept loop, + /// which listens for incoming connections. pub fn start(self: &Arc) { - let on_complete = |result: TaskResult>| async move { - if let TaskResult::Completed(Err(err)) = result { - error!("Accept loop stopped: {err}"); + // Handle the completion of the accept loop task + let on_complete = { + let this = self.clone(); + |result: TaskResult>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Main accept loop stopped: {err}"); + this.shutdown().await; + } } }; - // Spawns a new task for each new incoming connection + // Spawns a new task for the main accept loop self.task_group.spawn( { let this = self.clone(); @@ -100,6 +101,11 @@ impl Server { ); } + /// Returns the local endpoint. + pub fn local_endpoint(&self) -> Result { + self.listener.local_endpoint().map_err(Error::from) + } + /// Shuts down the RPC server pub async fn shutdown(&self) { self.task_group.cancel().await; @@ -335,6 +341,7 @@ impl Server { response } + /// Initializes a new [`Server`] from the provided [`ServerConfig`] async fn init(config: ServerConfig, ex: Option) -> Result> { let task_group = match ex { Some(ex) => TaskGroup::with_executor(ex), diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index a6b4c11..909b0b0 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -15,56 +15,3 @@ pub trait PubSubRPCService: Sync + Send { fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option; fn name(&self) -> String; } - -/// Implements the [`PubSubRPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// impl_rpc_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! impl_pubsub_rpc_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::PubSubRPCService for $t { - fn get_pubsub_method<'a>( - &'a self, - name: &'a str - ) -> Option { - match name { - $( - stringify!($m) => { - Some(Box::new( - move |chan: std::sync::Arc, method: String, params: serde_json::Value| { - Box::pin(self.$m(chan, method, params)) - })) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs index 9cc1d21..787da86 100644 --- a/jsonrpc/src/server/service.rs +++ b/jsonrpc/src/server/service.rs @@ -12,53 +12,3 @@ pub trait RPCService: Sync + Send { fn get_method<'a>(&'a self, name: &'a str) -> Option; fn name(&self) -> String; } - -/// Implements the [`RPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// impl_rpc_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! impl_rpc_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { - fn get_method<'a>( - &'a self, - name: &'a str - ) -> Option { - match name { - $( - stringify!($m) => { - Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} -- cgit v1.2.3