aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r--jsonrpc/src/server/channel.rs5
-rw-r--r--jsonrpc/src/server/mod.rs27
-rw-r--r--jsonrpc/src/server/pubsub_service.rs53
-rw-r--r--jsonrpc/src/server/service.rs50
4 files changed, 20 insertions, 115 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 36896b4..7eff66f 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -52,7 +52,7 @@ impl Subscription {
}
/// Checks from the partent if this subscription is still subscribed
- pub async fn still_subscribed(&self) -> bool {
+ async fn still_subscribed(&self) -> bool {
match self.parent.upgrade() {
Some(parent) => parent.subs.lock().await.contains(&self.id),
None => false,
@@ -93,7 +93,8 @@ impl Channel {
subs.remove(i);
}
- pub fn close(&self) {
+ /// Closes the [`Channel`]
+ pub(crate) fn close(&self) {
self.chan.close();
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index e2c3e3a..50a4a8e 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -64,20 +64,21 @@ pub struct Server {
}
impl Server {
- /// Returns the local endpoint.
- pub fn local_endpoint(&self) -> Result<Endpoint> {
- self.listener.local_endpoint().map_err(Error::from)
- }
-
- /// Starts the RPC server
+ /// Starts the RPC server. This will spawn a new task for the main accept loop,
+ /// which listens for incoming connections.
pub fn start(self: &Arc<Self>) {
- let on_complete = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Accept loop stopped: {err}");
+ // Handle the completion of the accept loop task
+ let on_complete = {
+ let this = self.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Main accept loop stopped: {err}");
+ this.shutdown().await;
+ }
}
};
- // Spawns a new task for each new incoming connection
+ // Spawns a new task for the main accept loop
self.task_group.spawn(
{
let this = self.clone();
@@ -100,6 +101,11 @@ impl Server {
);
}
+ /// Returns the local endpoint.
+ pub fn local_endpoint(&self) -> Result<Endpoint> {
+ self.listener.local_endpoint().map_err(Error::from)
+ }
+
/// Shuts down the RPC server
pub async fn shutdown(&self) {
self.task_group.cancel().await;
@@ -335,6 +341,7 @@ impl Server {
response
}
+ /// Initializes a new [`Server`] from the provided [`ServerConfig`]
async fn init(config: ServerConfig, ex: Option<Executor>) -> Result<Arc<Self>> {
let task_group = match ex {
Some(ex) => TaskGroup::with_executor(ex),
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index a6b4c11..909b0b0 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -15,56 +15,3 @@ pub trait PubSubRPCService: Sync + Send {
fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option<PubSubRPCMethod>;
fn name(&self) -> String;
}
-
-/// Implements the [`PubSubRPCService`] trait for a provided type.
-///
-/// # Example
-///
-/// ```
-/// use serde_json::Value;
-///
-/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
-///
-/// struct Hello {}
-///
-/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("foo!"))
-/// }
-///
-/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("bar!"))
-/// }
-/// }
-///
-/// impl_rpc_service!(Hello, foo, bar);
-///
-/// ```
-#[macro_export]
-macro_rules! impl_pubsub_rpc_service {
- ($t:ty, $($m:ident),*) => {
- impl karyon_jsonrpc::PubSubRPCService for $t {
- fn get_pubsub_method<'a>(
- &'a self,
- name: &'a str
- ) -> Option<karyon_jsonrpc::PubSubRPCMethod> {
- match name {
- $(
- stringify!($m) => {
- Some(Box::new(
- move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
- Box::pin(self.$m(chan, method, params))
- }))
- }
- )*
- _ => None,
- }
-
-
- }
- fn name(&self) -> String{
- stringify!($t).to_string()
- }
- }
- };
-}
diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs
index 9cc1d21..787da86 100644
--- a/jsonrpc/src/server/service.rs
+++ b/jsonrpc/src/server/service.rs
@@ -12,53 +12,3 @@ pub trait RPCService: Sync + Send {
fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>;
fn name(&self) -> String;
}
-
-/// Implements the [`RPCService`] trait for a provided type.
-///
-/// # Example
-///
-/// ```
-/// use serde_json::Value;
-///
-/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
-///
-/// struct Hello {}
-///
-/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("foo!"))
-/// }
-///
-/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("bar!"))
-/// }
-/// }
-///
-/// impl_rpc_service!(Hello, foo, bar);
-///
-/// ```
-#[macro_export]
-macro_rules! impl_rpc_service {
- ($t:ty, $($m:ident),*) => {
- impl karyon_jsonrpc::RPCService for $t {
- fn get_method<'a>(
- &'a self,
- name: &'a str
- ) -> Option<karyon_jsonrpc::RPCMethod> {
- match name {
- $(
- stringify!($m) => {
- Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params))))
- }
- )*
- _ => None,
- }
-
-
- }
- fn name(&self) -> String{
- stringify!($t).to_string()
- }
- }
- };
-}