From 9aa972dd83a85cec5da71e8e893eb6e07d5db8ca Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 21 Jun 2024 22:45:17 +0200 Subject: jsonrpc/client: fix subscription error when the subscriber cannot keep up Add a limit for receiving notifications for the subscription. If this limit is exceeded, the client will stop and raise an error. The limit is configurable when building a new client. --- jsonrpc/src/server/builder.rs | 110 +++++++++++++++++++++++++++++++++++++++--- jsonrpc/src/server/mod.rs | 2 +- 2 files changed, 104 insertions(+), 8 deletions(-) (limited to 'jsonrpc/src/server') diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs index 90024f3..ca6d1a7 100644 --- a/jsonrpc/src/server/builder.rs +++ b/jsonrpc/src/server/builder.rs @@ -29,12 +29,91 @@ pub struct ServerBuilder { impl ServerBuilder { /// Adds a new RPC service to the server. + /// + /// # Example + /// ``` + /// use std::sync::Arc; + /// + /// use serde_json::Value; + /// + /// use karyon_jsonrpc::{Server, rpc_impl, RPCError}; + /// + /// struct Ping {} + /// + /// #[rpc_impl] + /// impl Ping { + /// async fn ping(&self, _params: Value) -> Result { + /// Ok(serde_json::json!("Pong")) + /// } + /// } + /// + /// async { + /// let server = Server::builder("ws://127.0.0.1:3000").unwrap() + /// .service(Arc::new(Ping{})) + /// .build().await.unwrap(); + /// }; + /// + /// ``` pub fn service(mut self, service: Arc) -> Self { self.services.insert(service.name(), service); self } /// Adds a new PubSub RPC service to the server. + /// + /// # Example + /// ``` + /// use std::sync::Arc; + /// + /// use serde_json::Value; + /// + /// use karyon_jsonrpc::{ + /// Server, rpc_impl, rpc_pubsub_impl, RPCError, Channel, SubscriptionID, + /// }; + /// + /// struct Ping {} + /// + /// #[rpc_impl] + /// impl Ping { + /// async fn ping(&self, _params: Value) -> Result { + /// Ok(serde_json::json!("Pong")) + /// } + /// } + /// + /// #[rpc_pubsub_impl] + /// impl Ping { + /// async fn log_subscribe( + /// &self, + /// chan: Arc, + /// method: String, + /// _params: Value, + /// ) -> Result { + /// let sub = chan.new_subscription(&method).await; + /// let sub_id = sub.id.clone(); + /// Ok(serde_json::json!(sub_id)) + /// } + /// + /// async fn log_unsubscribe( + /// &self, + /// chan: Arc, + /// _method: String, + /// params: Value, + /// ) -> Result { + /// let sub_id: SubscriptionID = serde_json::from_value(params)?; + /// chan.remove_subscription(&sub_id).await; + /// Ok(serde_json::json!(true)) + /// } + /// } + /// + /// async { + /// let ping_service = Arc::new(Ping{}); + /// let server = Server::builder("ws://127.0.0.1:3000").unwrap() + /// .service(ping_service.clone()) + /// .pubsub_service(ping_service) + /// .build().await.unwrap(); + /// }; + /// + /// ``` pub fn pubsub_service(mut self, service: Arc) -> Self { self.pubsub_services.insert(service.name(), service); self @@ -44,9 +123,15 @@ impl ServerBuilder { /// /// # Example /// - /// ```ignore - /// let tcp_config = TcpConfig::default(); - /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// use karyon_jsonrpc::{Server, TcpConfig}; + /// + /// async { + /// let tcp_config = TcpConfig::default(); + /// let server = Server::builder("ws://127.0.0.1:3000").unwrap() + /// .tcp_config(tcp_config).unwrap() + /// .build().await.unwrap(); + /// }; /// ``` /// /// This function will return an error if the endpoint does not support TCP protocols. @@ -65,8 +150,15 @@ impl ServerBuilder { /// # Example /// /// ```ignore - /// let tls_config = rustls::ServerConfig::new(...); - /// let server = Server::builder()?.tls_config(tls_config)?.build()?; + /// use karon_jsonrpc::Server; + /// use futures_rustls::rustls; + /// + /// async { + /// let tls_config = rustls::ServerConfig::new(...); + /// let server = Server::builder("ws://127.0.0.1:3000").unwrap() + /// .tls_config(tls_config).unwrap() + /// .build().await.unwrap(); + /// }; /// ``` /// /// This function will return an error if the endpoint does not support TLS protocols. @@ -157,8 +249,12 @@ impl Server { /// /// # Example /// - /// ```ignore - /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + /// use karyon_jsonrpc::Server; + /// async { + /// let server = Server::builder("ws://127.0.0.1:3000").unwrap() + /// .build().await.unwrap(); + /// }; /// ``` pub fn builder(endpoint: impl ToEndpoint) -> Result { let endpoint = endpoint.to_endpoint()?; diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 6f539be..00b0fd2 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -126,7 +126,7 @@ impl Server { method: nt.method, params, }; - // debug!("--> {notification}"); + debug!("--> {notification}"); conn_cloned.send(serde_json::json!(notification)).await?; } } -- cgit v1.2.3