aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/builder.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-21 22:45:17 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-21 22:45:17 +0200
commit9aa972dd83a85cec5da71e8e893eb6e07d5db8ca (patch)
treea227c66e3e75e018f480556e1d58d40306acb12e /jsonrpc/src/client/builder.rs
parent8fc494d2d508f0e0beefccda31d15a5e387a9791 (diff)
jsonrpc/client: fix subscription error when the subscriber cannot keep up
Add a limit for receiving notifications for the subscription. If this limit is exceeded, the client will stop and raise an error. The limit is configurable when building a new client.
Diffstat (limited to 'jsonrpc/src/client/builder.rs')
-rw-r--r--jsonrpc/src/client/builder.rs102
1 files changed, 81 insertions, 21 deletions
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs
index 510ce56..c34d2da 100644
--- a/jsonrpc/src/client/builder.rs
+++ b/jsonrpc/src/client/builder.rs
@@ -1,4 +1,4 @@
-use std::sync::Arc;
+use std::sync::{atomic::AtomicBool, Arc};
#[cfg(feature = "smol")]
use futures_rustls::rustls;
@@ -20,6 +20,8 @@ use super::{Client, MessageDispatcher, Subscriptions};
const DEFAULT_TIMEOUT: u64 = 3000; // 3s
+const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000;
+
impl Client {
/// Creates a new [`ClientBuilder`]
///
@@ -27,8 +29,13 @@ impl Client {
///
/// # Example
///
- /// ```ignore
- /// let builder = Client::builder("ws://127.0.0.1:3000")?.build()?;
+ /// ```
+ /// use karyon_jsonrpc::Client;
+ ///
+ /// async {
+ /// let builder = Client::builder("ws://127.0.0.1:3000").unwrap();
+ /// let client = builder.build().await.unwrap();
+ /// };
/// ```
pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
let endpoint = endpoint.to_endpoint()?;
@@ -37,6 +44,7 @@ impl Client {
timeout: Some(DEFAULT_TIMEOUT),
tls_config: None,
tcp_config: Default::default(),
+ subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,
})
}
}
@@ -47,29 +55,66 @@ pub struct ClientBuilder {
tls_config: Option<(rustls::ClientConfig, String)>,
tcp_config: TcpConfig,
timeout: Option<u64>,
+ subscription_buffer_size: usize,
}
impl ClientBuilder {
/// Set timeout for receiving messages, in milliseconds. Requests will
/// fail if it takes longer.
///
- /// # Examples
+ /// # Example
///
- /// ```ignore
- /// let client = Client::builder()?.set_timeout(5000).build()?;
+ /// ```
+ /// use karyon_jsonrpc::Client;
+ ///
+ /// async {
+ /// let client = Client::builder("ws://127.0.0.1:3000").unwrap()
+ /// .set_timeout(5000)
+ /// .build().await.unwrap();
+ /// };
/// ```
pub fn set_timeout(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
+ /// Set max size for the subscription buffer.
+ ///
+ /// The client will stop when the subscriber cannot keep up.
+ /// When subscribing to a method, a new channel with the provided buffer
+ /// size is initialized. Once the buffer is full and the subscriber doesn't
+ /// process the messages in the buffer, the client will disconnect and
+ /// raise an error.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use karyon_jsonrpc::Client;
+ ///
+ /// async {
+ /// let client = Client::builder("ws://127.0.0.1:3000").unwrap()
+ /// .set_max_subscription_buffer_size(10000)
+ /// .build().await.unwrap();
+ /// };
+ /// ```
+ pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self {
+ self.subscription_buffer_size = size;
+ self
+ }
+
/// Configure TCP settings for the client.
///
/// # Example
///
- /// ```ignore
- /// let tcp_config = TcpConfig::default();
- /// let client = Client::builder()?.tcp_config(tcp_config)?.build()?;
+ /// ```
+ /// use karyon_jsonrpc::{Client, TcpConfig};
+ ///
+ /// async {
+ /// let tcp_config = TcpConfig::default();
+ ///
+ /// let client = Client::builder("ws://127.0.0.1:3000").unwrap()
+ /// .tcp_config(tcp_config).unwrap().build().await.unwrap();
+ /// };
/// ```
///
/// This function will return an error if the endpoint does not support TCP protocols.
@@ -88,8 +133,16 @@ impl ClientBuilder {
/// # Example
///
/// ```ignore
- /// let tls_config = rustls::ClientConfig::new(...);
- /// let client = Client::builder()?.tls_config(tls_config, "example.com")?.build()?;
+ /// use karyon_jsonrpc::Client;
+ /// use futures_rustls::rustls;
+ ///
+ /// async {
+ /// let tls_config = rustls::ClientConfig::new(...);
+ ///
+ /// let client_builder = Client::builder("ws://127.0.0.1:3000").unwrap()
+ /// .tls_config(tls_config, "example.com").unwrap()
+ /// .build().await.unwrap();
+ /// };
/// ```
///
/// This function will return an error if the endpoint does not support TLS protocols.
@@ -110,13 +163,17 @@ impl ClientBuilder {
///
/// # Example
///
- /// ```ignore
- /// let client = Client::builder(endpoint)?
- /// .set_timeout(5000)
- /// .tcp_config(tcp_config)?
- /// .tls_config(tls_config, "example.com")?
- /// .build()
- /// .await?;
+ /// ```
+ /// use karyon_jsonrpc::{Client, TcpConfig};
+ ///
+ /// async {
+ /// let tcp_config = TcpConfig::default();
+ /// let client = Client::builder("ws://127.0.0.1:3000").unwrap()
+ /// .tcp_config(tcp_config).unwrap()
+ /// .set_timeout(5000)
+ /// .build().await.unwrap();
+ /// };
+ ///
/// ```
pub async fn build(self) -> Result<Arc<Client>> {
let conn: Conn<serde_json::Value> = match self.endpoint {
@@ -168,14 +225,17 @@ impl ClientBuilder {
_ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
};
+ let send_chan = async_channel::bounded(10);
+
let client = Arc::new(Client {
timeout: self.timeout,
- conn,
+ disconnect: AtomicBool::new(false),
+ send_chan,
message_dispatcher: MessageDispatcher::new(),
- subscriptions: Subscriptions::new(),
+ subscriptions: Subscriptions::new(self.subscription_buffer_size),
task_group: TaskGroup::new(),
});
- client.start_background_receiving();
+ client.start_background_loop(conn);
Ok(client)
}
}