From 9aa972dd83a85cec5da71e8e893eb6e07d5db8ca Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 21 Jun 2024 22:45:17 +0200 Subject: jsonrpc/client: fix subscription error when the subscriber cannot keep up Add a limit for receiving notifications for the subscription. If this limit is exceeded, the client will stop and raise an error. The limit is configurable when building a new client. --- jsonrpc/src/client/subscriptions.rs | 69 +++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 15 deletions(-) (limited to 'jsonrpc/src/client/subscriptions.rs') diff --git a/jsonrpc/src/client/subscriptions.rs b/jsonrpc/src/client/subscriptions.rs index 9c8a9f4..f3d8cb2 100644 --- a/jsonrpc/src/client/subscriptions.rs +++ b/jsonrpc/src/client/subscriptions.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, sync::Arc}; use async_channel::{Receiver, Sender}; -use log::warn; use serde_json::json; use serde_json::Value; @@ -12,37 +11,77 @@ use crate::{ Error, Result, }; -/// Type alias for a subscription to receive notifications. -/// -/// The receiver channel is returned by the `subscribe` -pub type Subscription = Receiver; +/// A subscription established when the client's subscribe to a method +pub struct Subscription { + id: SubscriptionID, + rx: Receiver, + tx: Sender, +} + +impl Subscription { + fn new(id: SubscriptionID, buffer_size: usize) -> Arc { + let (tx, rx) = async_channel::bounded(buffer_size); + Arc::new(Self { tx, id, rx }) + } + + pub async fn recv(&self) -> Result { + self.rx.recv().await.map_err(Error::from) + } + + pub fn id(&self) -> SubscriptionID { + self.id + } + + async fn notify(&self, val: Value) -> Result<()> { + if self.tx.is_full() { + return Err(Error::SubscriptionBufferFull); + } + self.tx.send(val).await?; + Ok(()) + } + + fn close(&self) { + self.tx.close(); + } +} /// Manages subscriptions for the client. pub(super) struct Subscriptions { - subs: Mutex>>, + subs: Mutex>>, + sub_buffer_size: usize, } impl Subscriptions { - pub(super) fn new() -> Arc { + /// Creates a new [`Subscriptions`]. + pub(super) fn new(sub_buffer_size: usize) -> Arc { Arc::new(Self { subs: Mutex::new(HashMap::new()), + sub_buffer_size, }) } - pub(super) async fn subscribe(self: &Arc, id: SubscriptionID) -> Subscription { - let (ch_tx, ch_rx) = async_channel::unbounded(); - self.subs.lock().await.insert(id, ch_tx); - ch_rx + /// Returns a new [`Subscription`] + pub(super) async fn subscribe(&self, id: SubscriptionID) -> Arc { + let sub = Subscription::new(id, self.sub_buffer_size); + self.subs.lock().await.insert(id, sub.clone()); + sub } - pub(super) async fn drop_all(&self) { - self.subs.lock().await.clear(); + /// Closes subscription channels and clear the inner map. + pub(super) async fn clear(&self) { + let mut subs = self.subs.lock().await; + for (_, sub) in subs.iter() { + sub.close(); + } + subs.clear(); } + /// Unsubscribe from the provided subscription id. pub(super) async fn unsubscribe(&self, id: &SubscriptionID) { self.subs.lock().await.remove(id); } + /// Notifies the subscription about the given notification. pub(super) async fn notify(&self, nt: Notification) -> Result<()> { let nt_res: NotificationResult = match nt.params { Some(ref p) => serde_json::from_value(p.clone())?, @@ -50,9 +89,9 @@ impl Subscriptions { }; match self.subs.lock().await.get(&nt_res.subscription) { - Some(s) => s.send(nt_res.result.unwrap_or(json!(""))).await?, + Some(s) => s.notify(nt_res.result.unwrap_or(json!(""))).await?, None => { - warn!("Receive unknown notification {}", nt_res.subscription) + return Err(Error::InvalidMsg("Unknown notification")); } } -- cgit v1.2.3