From b8b5f00e9695f46ea30af3ce63aec6dd17f356ae Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 27 Jun 2024 02:39:31 +0200 Subject: Improve async channels error handling and replace unbounded channels with bounded channels Remove all unbounded channels to prevent unbounded memory usage and potential crashes. Use `FuturesUnordered` for sending to multiple channels simultaneously. This prevents the sending loop from blocking if one channel is blocked, and helps handle errors properly. --- core/src/pubsub.rs | 56 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 13 deletions(-) (limited to 'core/src/pubsub.rs') diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs index bcc24ef..09b62ea 100644 --- a/core/src/pubsub.rs +++ b/core/src/pubsub.rs @@ -1,11 +1,14 @@ use std::{collections::HashMap, sync::Arc}; +use futures_util::stream::{FuturesUnordered, StreamExt}; use log::error; -use crate::{async_runtime::lock::Mutex, util::random_16, Result}; +use crate::{async_runtime::lock::Mutex, util::random_32, Result}; + +const CHANNEL_BUFFER_SIZE: usize = 1000; pub type ArcPublisher = Arc>; -pub type SubscriptionID = u16; +pub type SubscriptionID = u32; /// A simple publish-subscribe system. // # Example @@ -28,27 +31,46 @@ pub type SubscriptionID = u16; /// ``` pub struct Publisher { subs: Mutex>>, + subscription_buffer_size: usize, } impl Publisher { - /// Creates a new Publisher + /// Creates a new [`Publisher`] pub fn new() -> ArcPublisher { Arc::new(Self { subs: Mutex::new(HashMap::new()), + subscription_buffer_size: CHANNEL_BUFFER_SIZE, + }) + } + + /// Creates a new [`Publisher`] with the provided buffer size for the + /// [`Subscription`] channel. + /// + /// This is important to control the memory used by the [`Subscription`] channel. + /// If the subscriber can't keep up with the new messages coming, then the + /// channel buffer will fill with new messages, and if the buffer is full, + /// the emit function will block until the subscriber starts to process + /// the buffered messages. + /// + /// If `size` is zero, this function will panic. + pub fn with_buffer_size(size: usize) -> ArcPublisher { + Arc::new(Self { + subs: Mutex::new(HashMap::new()), + subscription_buffer_size: size, }) } - /// Subscribe and return a Subscription + /// Subscribes and return a [`Subscription`] pub async fn subscribe(self: &Arc) -> Subscription { let mut subs = self.subs.lock().await; - let chan = async_channel::unbounded(); + let chan = async_channel::bounded(self.subscription_buffer_size); - let mut sub_id = random_16(); + let mut sub_id = random_32(); - // While the SubscriptionID already exists, generate a new one + // Generate a new one if sub_id already exists while subs.contains_key(&sub_id) { - sub_id = random_16(); + sub_id = random_32(); } let sub = Subscription::new(sub_id, self.clone(), chan.1); @@ -57,22 +79,30 @@ impl Publisher { sub } - /// Unsubscribe from the Publisher + /// Unsubscribes from the publisher pub async fn unsubscribe(self: &Arc, id: &SubscriptionID) { self.subs.lock().await.remove(id); } - /// Notify all subscribers + /// Notifies all subscribers pub async fn notify(self: &Arc, value: &T) { let mut subs = self.subs.lock().await; + + let mut results = FuturesUnordered::new(); let mut closed_subs = vec![]; for (sub_id, sub) in subs.iter() { - if let Err(err) = sub.send(value.clone()).await { - error!("failed to notify {}: {}", sub_id, err); - closed_subs.push(*sub_id); + let result = async { (*sub_id, sub.send(value.clone()).await) }; + results.push(result); + } + + while let Some((id, fut_err)) = results.next().await { + if let Err(err) = fut_err { + error!("failed to notify {}: {}", id, err); + closed_subs.push(id); } } + drop(results); for sub_id in closed_subs.iter() { subs.remove(sub_id); -- cgit v1.2.3