diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-06-21 22:45:17 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-06-21 22:45:17 +0200 | 
| commit | 9aa972dd83a85cec5da71e8e893eb6e07d5db8ca (patch) | |
| tree | a227c66e3e75e018f480556e1d58d40306acb12e /jsonrpc/src/client/builder.rs | |
| parent | 8fc494d2d508f0e0beefccda31d15a5e387a9791 (diff) | |
jsonrpc/client: fix subscription error when the subscriber cannot keep up
Add a limit for receiving notifications for the subscription. If this
limit is exceeded, the client will stop and raise an error. The limit is
configurable when building a new client.
Diffstat (limited to 'jsonrpc/src/client/builder.rs')
| -rw-r--r-- | jsonrpc/src/client/builder.rs | 102 | 
1 files changed, 81 insertions, 21 deletions
| diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs index 510ce56..c34d2da 100644 --- a/jsonrpc/src/client/builder.rs +++ b/jsonrpc/src/client/builder.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicBool, Arc};  #[cfg(feature = "smol")]  use futures_rustls::rustls; @@ -20,6 +20,8 @@ use super::{Client, MessageDispatcher, Subscriptions};  const DEFAULT_TIMEOUT: u64 = 3000; // 3s +const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000; +  impl Client {      /// Creates a new [`ClientBuilder`]      /// @@ -27,8 +29,13 @@ impl Client {      ///      /// # Example      /// -    /// ```ignore -    /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?; +    /// ``` +    /// use karyon_jsonrpc::Client; +    ///   +    /// async { +    ///     let builder = Client::builder("ws://127.0.0.1:3000").unwrap(); +    ///     let client = builder.build().await.unwrap(); +    /// };      /// ```      pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {          let endpoint = endpoint.to_endpoint()?; @@ -37,6 +44,7 @@ impl Client {              timeout: Some(DEFAULT_TIMEOUT),              tls_config: None,              tcp_config: Default::default(), +            subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,          })      }  } @@ -47,29 +55,66 @@ pub struct ClientBuilder {      tls_config: Option<(rustls::ClientConfig, String)>,      tcp_config: TcpConfig,      timeout: Option<u64>, +    subscription_buffer_size: usize,  }  impl ClientBuilder {      /// Set timeout for receiving messages, in milliseconds. Requests will      /// fail if it takes longer.      /// -    /// # Examples +    /// # Example      /// -    /// ```ignore -    /// let client = Client::builder()?.set_timeout(5000).build()?; +    /// ``` +    /// use karyon_jsonrpc::Client; +    ///   +    /// async { +    ///     let client = Client::builder("ws://127.0.0.1:3000").unwrap() +    ///         .set_timeout(5000) +    ///         .build().await.unwrap(); +    /// };      /// ```      pub fn set_timeout(mut self, timeout: u64) -> Self {          self.timeout = Some(timeout);          self      } +    /// Set max size for the subscription buffer. +    /// +    /// The client will stop when the subscriber cannot keep up. +    /// When subscribing to a method, a new channel with the provided buffer +    /// size is initialized. Once the buffer is full and the subscriber doesn't +    /// process the messages in the buffer, the client will disconnect and +    /// raise an error. +    /// +    /// # Example +    /// +    /// ``` +    /// use karyon_jsonrpc::Client; +    ///   +    /// async { +    ///     let client = Client::builder("ws://127.0.0.1:3000").unwrap() +    ///         .set_max_subscription_buffer_size(10000) +    ///         .build().await.unwrap(); +    /// }; +    /// ``` +    pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self { +        self.subscription_buffer_size = size; +        self +    } +      /// Configure TCP settings for the client.      ///      /// # Example      /// -    /// ```ignore -    /// let tcp_config = TcpConfig::default(); -    /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?; +    /// ``` +    /// use karyon_jsonrpc::{Client, TcpConfig}; +    ///   +    /// async { +    ///     let tcp_config = TcpConfig::default(); +    /// +    ///     let client = Client::builder("ws://127.0.0.1:3000").unwrap() +    ///         .tcp_config(tcp_config).unwrap().build().await.unwrap(); +    /// };      /// ```      ///      /// This function will return an error if the endpoint does not support TCP protocols. @@ -88,8 +133,16 @@ impl ClientBuilder {      /// # Example      ///      /// ```ignore -    /// let tls_config = rustls::ClientConfig::new(...); -    /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?; +    /// use karyon_jsonrpc::Client; +    /// use futures_rustls::rustls; +    ///   +    /// async { +    ///     let tls_config = rustls::ClientConfig::new(...); +    /// +    ///     let client_builder = Client::builder("ws://127.0.0.1:3000").unwrap() +    ///         .tls_config(tls_config, "example.com").unwrap() +    ///         .build().await.unwrap(); +    /// };      /// ```      ///      /// This function will return an error if the endpoint does not support TLS protocols. @@ -110,13 +163,17 @@ impl ClientBuilder {      ///      /// # Example      /// -    /// ```ignore -    /// let client = Client::builder(endpoint)? -    ///     .set_timeout(5000) -    ///     .tcp_config(tcp_config)? -    ///     .tls_config(tls_config, "example.com")? -    ///     .build() -    ///     .await?; +    /// ``` +    /// use karyon_jsonrpc::{Client, TcpConfig}; +    ///   +    /// async { +    ///     let tcp_config = TcpConfig::default(); +    ///     let client = Client::builder("ws://127.0.0.1:3000").unwrap() +    ///         .tcp_config(tcp_config).unwrap() +    ///         .set_timeout(5000) +    ///         .build().await.unwrap(); +    /// }; +    ///      /// ```      pub async fn build(self) -> Result<Arc<Client>> {          let conn: Conn<serde_json::Value> = match self.endpoint { @@ -168,14 +225,17 @@ impl ClientBuilder {              _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),          }; +        let send_chan = async_channel::bounded(10); +          let client = Arc::new(Client {              timeout: self.timeout, -            conn, +            disconnect: AtomicBool::new(false), +            send_chan,              message_dispatcher: MessageDispatcher::new(), -            subscriptions: Subscriptions::new(), +            subscriptions: Subscriptions::new(self.subscription_buffer_size),              task_group: TaskGroup::new(),          }); -        client.start_background_receiving(); +        client.start_background_loop(conn);          Ok(client)      }  } | 
