diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-21 22:45:17 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-21 22:45:17 +0200 |
commit | 9aa972dd83a85cec5da71e8e893eb6e07d5db8ca (patch) | |
tree | a227c66e3e75e018f480556e1d58d40306acb12e /jsonrpc/src/client/builder.rs | |
parent | 8fc494d2d508f0e0beefccda31d15a5e387a9791 (diff) |
jsonrpc/client: fix subscription error when the subscriber cannot keep up
Add a limit for receiving notifications for the subscription. If this
limit is exceeded, the client will stop and raise an error. The limit is
configurable when building a new client.
Diffstat (limited to 'jsonrpc/src/client/builder.rs')
-rw-r--r-- | jsonrpc/src/client/builder.rs | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs index 510ce56..c34d2da 100644 --- a/jsonrpc/src/client/builder.rs +++ b/jsonrpc/src/client/builder.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicBool, Arc}; #[cfg(feature = "smol")] use futures_rustls::rustls; @@ -20,6 +20,8 @@ use super::{Client, MessageDispatcher, Subscriptions}; const DEFAULT_TIMEOUT: u64 = 3000; // 3s +const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000; + impl Client { /// Creates a new [`ClientBuilder`] /// @@ -27,8 +29,13 @@ impl Client { /// /// # Example /// - /// ```ignore - /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?; + /// ``` + /// use karyon_jsonrpc::Client; + /// + /// async { + /// let builder = Client::builder("ws://127.0.0.1:3000").unwrap(); + /// let client = builder.build().await.unwrap(); + /// }; /// ``` pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> { let endpoint = endpoint.to_endpoint()?; @@ -37,6 +44,7 @@ impl Client { timeout: Some(DEFAULT_TIMEOUT), tls_config: None, tcp_config: Default::default(), + subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE, }) } } @@ -47,29 +55,66 @@ pub struct ClientBuilder { tls_config: Option<(rustls::ClientConfig, String)>, tcp_config: TcpConfig, timeout: Option<u64>, + subscription_buffer_size: usize, } impl ClientBuilder { /// Set timeout for receiving messages, in milliseconds. Requests will /// fail if it takes longer. /// - /// # Examples + /// # Example /// - /// ```ignore - /// let client = Client::builder()?.set_timeout(5000).build()?; + /// ``` + /// use karyon_jsonrpc::Client; + /// + /// async { + /// let client = Client::builder("ws://127.0.0.1:3000").unwrap() + /// .set_timeout(5000) + /// .build().await.unwrap(); + /// }; /// ``` pub fn set_timeout(mut self, timeout: u64) -> Self { self.timeout = Some(timeout); self } + /// Set max size for the subscription buffer. + /// + /// The client will stop when the subscriber cannot keep up. + /// When subscribing to a method, a new channel with the provided buffer + /// size is initialized. Once the buffer is full and the subscriber doesn't + /// process the messages in the buffer, the client will disconnect and + /// raise an error. + /// + /// # Example + /// + /// ``` + /// use karyon_jsonrpc::Client; + /// + /// async { + /// let client = Client::builder("ws://127.0.0.1:3000").unwrap() + /// .set_max_subscription_buffer_size(10000) + /// .build().await.unwrap(); + /// }; + /// ``` + pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self { + self.subscription_buffer_size = size; + self + } + /// Configure TCP settings for the client. /// /// # Example /// - /// ```ignore - /// let tcp_config = TcpConfig::default(); - /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?; + /// ``` + /// use karyon_jsonrpc::{Client, TcpConfig}; + /// + /// async { + /// let tcp_config = TcpConfig::default(); + /// + /// let client = Client::builder("ws://127.0.0.1:3000").unwrap() + /// .tcp_config(tcp_config).unwrap().build().await.unwrap(); + /// }; /// ``` /// /// This function will return an error if the endpoint does not support TCP protocols. @@ -88,8 +133,16 @@ impl ClientBuilder { /// # Example /// /// ```ignore - /// let tls_config = rustls::ClientConfig::new(...); - /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?; + /// use karyon_jsonrpc::Client; + /// use futures_rustls::rustls; + /// + /// async { + /// let tls_config = rustls::ClientConfig::new(...); + /// + /// let client_builder = Client::builder("ws://127.0.0.1:3000").unwrap() + /// .tls_config(tls_config, "example.com").unwrap() + /// .build().await.unwrap(); + /// }; /// ``` /// /// This function will return an error if the endpoint does not support TLS protocols. @@ -110,13 +163,17 @@ impl ClientBuilder { /// /// # Example /// - /// ```ignore - /// let client = Client::builder(endpoint)? - /// .set_timeout(5000) - /// .tcp_config(tcp_config)? - /// .tls_config(tls_config, "example.com")? - /// .build() - /// .await?; + /// ``` + /// use karyon_jsonrpc::{Client, TcpConfig}; + /// + /// async { + /// let tcp_config = TcpConfig::default(); + /// let client = Client::builder("ws://127.0.0.1:3000").unwrap() + /// .tcp_config(tcp_config).unwrap() + /// .set_timeout(5000) + /// .build().await.unwrap(); + /// }; + /// /// ``` pub async fn build(self) -> Result<Arc<Client>> { let conn: Conn<serde_json::Value> = match self.endpoint { @@ -168,14 +225,17 @@ impl ClientBuilder { _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), }; + let send_chan = async_channel::bounded(10); + let client = Arc::new(Client { timeout: self.timeout, - conn, + disconnect: AtomicBool::new(false), + send_chan, message_dispatcher: MessageDispatcher::new(), - subscriptions: Subscriptions::new(), + subscriptions: Subscriptions::new(self.subscription_buffer_size), task_group: TaskGroup::new(), }); - client.start_background_receiving(); + client.start_background_loop(conn); Ok(client) } } |