aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/builder.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-21 22:45:17 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-21 22:45:17 +0200
commit9aa972dd83a85cec5da71e8e893eb6e07d5db8ca (patch)
treea227c66e3e75e018f480556e1d58d40306acb12e /jsonrpc/src/server/builder.rs
parent8fc494d2d508f0e0beefccda31d15a5e387a9791 (diff)
jsonrpc/client: fix subscription error when the subscriber cannot keep up
Add a limit for receiving notifications for the subscription. If this limit is exceeded, the client will stop and raise an error. The limit is configurable when building a new client.
Diffstat (limited to 'jsonrpc/src/server/builder.rs')
-rw-r--r--jsonrpc/src/server/builder.rs110
1 files changed, 103 insertions, 7 deletions
diff --git a/jsonrpc/src/server/builder.rs b/jsonrpc/src/server/builder.rs
index 90024f3..ca6d1a7 100644
--- a/jsonrpc/src/server/builder.rs
+++ b/jsonrpc/src/server/builder.rs
@@ -29,12 +29,91 @@ pub struct ServerBuilder {
impl ServerBuilder {
/// Adds a new RPC service to the server.
+ ///
+ /// # Example
+ /// ```
+ /// use std::sync::Arc;
+ ///
+ /// use serde_json::Value;
+ ///
+ /// use karyon_jsonrpc::{Server, rpc_impl, RPCError};
+ ///
+ /// struct Ping {}
+ ///
+ /// #[rpc_impl]
+ /// impl Ping {
+ /// async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
+ /// Ok(serde_json::json!("Pong"))
+ /// }
+ /// }
+ ///
+ /// async {
+ /// let server = Server::builder("ws://127.0.0.1:3000").unwrap()
+ /// .service(Arc::new(Ping{}))
+ /// .build().await.unwrap();
+ /// };
+ ///
+ /// ```
pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
self.services.insert(service.name(), service);
self
}
/// Adds a new PubSub RPC service to the server.
+ ///
+ /// # Example
+ /// ```
+ /// use std::sync::Arc;
+ ///
+ /// use serde_json::Value;
+ ///
+ /// use karyon_jsonrpc::{
+ /// Server, rpc_impl, rpc_pubsub_impl, RPCError, Channel, SubscriptionID,
+ /// };
+ ///
+ /// struct Ping {}
+ ///
+ /// #[rpc_impl]
+ /// impl Ping {
+ /// async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
+ /// Ok(serde_json::json!("Pong"))
+ /// }
+ /// }
+ ///
+ /// #[rpc_pubsub_impl]
+ /// impl Ping {
+ /// async fn log_subscribe(
+ /// &self,
+ /// chan: Arc<Channel>,
+ /// method: String,
+ /// _params: Value,
+ /// ) -> Result<Value, RPCError> {
+ /// let sub = chan.new_subscription(&method).await;
+ /// let sub_id = sub.id.clone();
+ /// Ok(serde_json::json!(sub_id))
+ /// }
+ ///
+ /// async fn log_unsubscribe(
+ /// &self,
+ /// chan: Arc<Channel>,
+ /// _method: String,
+ /// params: Value,
+ /// ) -> Result<Value, RPCError> {
+ /// let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ /// chan.remove_subscription(&sub_id).await;
+ /// Ok(serde_json::json!(true))
+ /// }
+ /// }
+ ///
+ /// async {
+ /// let ping_service = Arc::new(Ping{});
+ /// let server = Server::builder("ws://127.0.0.1:3000").unwrap()
+ /// .service(ping_service.clone())
+ /// .pubsub_service(ping_service)
+ /// .build().await.unwrap();
+ /// };
+ ///
+ /// ```
pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
self.pubsub_services.insert(service.name(), service);
self
@@ -44,9 +123,15 @@ impl ServerBuilder {
///
/// # Example
///
- /// ```ignore
- /// let tcp_config = TcpConfig::default();
- /// let server = Server::builder()?.tcp_config(tcp_config)?.build()?;
+ /// ```
+ /// use karyon_jsonrpc::{Server, TcpConfig};
+ ///
+ /// async {
+ /// let tcp_config = TcpConfig::default();
+ /// let server = Server::builder("ws://127.0.0.1:3000").unwrap()
+ /// .tcp_config(tcp_config).unwrap()
+ /// .build().await.unwrap();
+ /// };
/// ```
///
/// This function will return an error if the endpoint does not support TCP protocols.
@@ -65,8 +150,15 @@ impl ServerBuilder {
/// # Example
///
/// ```ignore
- /// let tls_config = rustls::ServerConfig::new(...);
- /// let server = Server::builder()?.tls_config(tls_config)?.build()?;
+ /// use karon_jsonrpc::Server;
+ /// use futures_rustls::rustls;
+ ///
+ /// async {
+ /// let tls_config = rustls::ServerConfig::new(...);
+ /// let server = Server::builder("ws://127.0.0.1:3000").unwrap()
+ /// .tls_config(tls_config).unwrap()
+ /// .build().await.unwrap();
+ /// };
/// ```
///
/// This function will return an error if the endpoint does not support TLS protocols.
@@ -157,8 +249,12 @@ impl Server {
///
/// # Example
///
- /// ```ignore
- /// let builder = Server::builder("ws://127.0.0.1:3000")?.build()?;
+ /// ```
+ /// use karyon_jsonrpc::Server;
+ /// async {
+ /// let server = Server::builder("ws://127.0.0.1:3000").unwrap()
+ /// .build().await.unwrap();
+ /// };
/// ```
pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> {
let endpoint = endpoint.to_endpoint()?;