diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
commit | b8b5f00e9695f46ea30af3ce63aec6dd17f356ae (patch) | |
tree | 3f1b07539c248f9536f5c7b6e3870e235d4f49d7 /core/src/pubsub.rs | |
parent | 1a3ef2d77ab54bfe286f7400ac0cee2e25ea14e3 (diff) |
Improve async channels error handling and replace unbounded channels with bounded channels
Remove all unbounded channels to prevent unbounded memory usage and
potential crashes.
Use `FuturesUnordered` for sending to multiple channels simultaneously.
This prevents the sending loop from blocking if one channel is blocked,
and helps handle errors properly.
Diffstat (limited to 'core/src/pubsub.rs')
-rw-r--r-- | core/src/pubsub.rs | 56 |
1 files changed, 43 insertions, 13 deletions
diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs index bcc24ef..09b62ea 100644 --- a/core/src/pubsub.rs +++ b/core/src/pubsub.rs @@ -1,11 +1,14 @@ use std::{collections::HashMap, sync::Arc}; +use futures_util::stream::{FuturesUnordered, StreamExt}; use log::error; -use crate::{async_runtime::lock::Mutex, util::random_16, Result}; +use crate::{async_runtime::lock::Mutex, util::random_32, Result}; + +const CHANNEL_BUFFER_SIZE: usize = 1000; pub type ArcPublisher<T> = Arc<Publisher<T>>; -pub type SubscriptionID = u16; +pub type SubscriptionID = u32; /// A simple publish-subscribe system. // # Example @@ -28,27 +31,46 @@ pub type SubscriptionID = u16; /// ``` pub struct Publisher<T> { subs: Mutex<HashMap<SubscriptionID, async_channel::Sender<T>>>, + subscription_buffer_size: usize, } impl<T: Clone> Publisher<T> { - /// Creates a new Publisher + /// Creates a new [`Publisher`] pub fn new() -> ArcPublisher<T> { Arc::new(Self { subs: Mutex::new(HashMap::new()), + subscription_buffer_size: CHANNEL_BUFFER_SIZE, + }) + } + + /// Creates a new [`Publisher`] with the provided buffer size for the + /// [`Subscription`] channel. + /// + /// This is important to control the memory used by the [`Subscription`] channel. + /// If the subscriber can't keep up with the new messages coming, then the + /// channel buffer will fill with new messages, and if the buffer is full, + /// the emit function will block until the subscriber starts to process + /// the buffered messages. + /// + /// If `size` is zero, this function will panic. + pub fn with_buffer_size(size: usize) -> ArcPublisher<T> { + Arc::new(Self { + subs: Mutex::new(HashMap::new()), + subscription_buffer_size: size, }) } - /// Subscribe and return a Subscription + /// Subscribes and return a [`Subscription`] pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> { let mut subs = self.subs.lock().await; - let chan = async_channel::unbounded(); + let chan = async_channel::bounded(self.subscription_buffer_size); - let mut sub_id = random_16(); + let mut sub_id = random_32(); - // While the SubscriptionID already exists, generate a new one + // Generate a new one if sub_id already exists while subs.contains_key(&sub_id) { - sub_id = random_16(); + sub_id = random_32(); } let sub = Subscription::new(sub_id, self.clone(), chan.1); @@ -57,22 +79,30 @@ impl<T: Clone> Publisher<T> { sub } - /// Unsubscribe from the Publisher + /// Unsubscribes from the publisher pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) { self.subs.lock().await.remove(id); } - /// Notify all subscribers + /// Notifies all subscribers pub async fn notify(self: &Arc<Self>, value: &T) { let mut subs = self.subs.lock().await; + + let mut results = FuturesUnordered::new(); let mut closed_subs = vec![]; for (sub_id, sub) in subs.iter() { - if let Err(err) = sub.send(value.clone()).await { - error!("failed to notify {}: {}", sub_id, err); - closed_subs.push(*sub_id); + let result = async { (*sub_id, sub.send(value.clone()).await) }; + results.push(result); + } + + while let Some((id, fut_err)) = results.next().await { + if let Err(err) = fut_err { + error!("failed to notify {}: {}", id, err); + closed_subs.push(id); } } + drop(results); for sub_id in closed_subs.iter() { subs.remove(sub_id); |