diff options
author | hozan23 <hozan23@proton.me> | 2023-11-08 13:03:27 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-08 13:03:27 +0300 |
commit | 4fe665fc8bc6265baf5bfba6b6a5f3ee2dba63dc (patch) | |
tree | 77c7c40c9725539546e53b00f424deafe5ec81a8 /karyons_core/src/pubsub.rs |
first commit
Diffstat (limited to 'karyons_core/src/pubsub.rs')
-rw-r--r-- | karyons_core/src/pubsub.rs | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/karyons_core/src/pubsub.rs b/karyons_core/src/pubsub.rs new file mode 100644 index 0000000..4cc0ab7 --- /dev/null +++ b/karyons_core/src/pubsub.rs @@ -0,0 +1,115 @@ +use std::{collections::HashMap, sync::Arc}; + +use log::error; +use smol::lock::Mutex; + +use crate::{utils::random_16, Result}; + +pub type ArcPublisher<T> = Arc<Publisher<T>>; +pub type SubscriptionID = u16; + +/// A simple publish-subscribe system. +// # Example +/// +/// ``` +/// use karyons_core::pubsub::{Publisher}; +/// +/// async { +/// let publisher = Publisher::new(); +/// +/// let sub = publisher.subscribe().await; +/// +/// publisher.notify(&String::from("MESSAGE")).await; +/// +/// let msg = sub.recv().await; +/// +/// // .... +/// }; +/// +/// ``` +pub struct Publisher<T> { + subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>, +} + +impl<T: Clone> Publisher<T> { + /// Creates a new Publisher + pub fn new() -> ArcPublisher<T> { + Arc::new(Self { + subs: Mutex::new(HashMap::new()), + }) + } + + /// Subscribe and return a Subscription + pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> { + let mut subs = self.subs.lock().await; + + let chan = smol::channel::unbounded(); + + let mut sub_id = random_16(); + + // While the SubscriptionID already exists, generate a new one + while subs.contains_key(&sub_id) { + sub_id = random_16(); + } + + let sub = Subscription::new(sub_id, self.clone(), chan.1); + subs.insert(sub_id, chan.0); + + sub + } + + /// Unsubscribe from the Publisher + pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) { + self.subs.lock().await.remove(id); + } + + /// Notify all subscribers + pub async fn notify(self: &Arc<Self>, value: &T) { + let mut subs = self.subs.lock().await; + let mut closed_subs = vec![]; + + for (sub_id, sub) in subs.iter() { + if let Err(err) = sub.send(value.clone()).await { + error!("failed to notify {}: {}", sub_id, err); + closed_subs.push(*sub_id); + } + } + + for sub_id in closed_subs.iter() { + subs.remove(sub_id); + } + } +} + +// Subscription +pub struct Subscription<T> { + id: SubscriptionID, + recv_chan: smol::channel::Receiver<T>, + publisher: ArcPublisher<T>, +} + +impl<T: Clone> Subscription<T> { + /// Creates a new Subscription + pub fn new( + id: SubscriptionID, + publisher: ArcPublisher<T>, + recv_chan: smol::channel::Receiver<T>, + ) -> Subscription<T> { + Self { + id, + recv_chan, + publisher, + } + } + + /// Receive a message from the Publisher + pub async fn recv(&self) -> Result<T> { + let msg = self.recv_chan.recv().await?; + Ok(msg) + } + + /// Unsubscribe from the Publisher + pub async fn unsubscribe(&self) { + self.publisher.unsubscribe(&self.id).await; + } +} |