From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- core/src/pubsub.rs | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 core/src/pubsub.rs (limited to 'core/src/pubsub.rs') diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs new file mode 100644 index 0000000..4cc0ab7 --- /dev/null +++ b/core/src/pubsub.rs @@ -0,0 +1,115 @@ +use std::{collections::HashMap, sync::Arc}; + +use log::error; +use smol::lock::Mutex; + +use crate::{utils::random_16, Result}; + +pub type ArcPublisher = Arc>; +pub type SubscriptionID = u16; + +/// A simple publish-subscribe system. +// # Example +/// +/// ``` +/// use karyons_core::pubsub::{Publisher}; +/// +/// async { +/// let publisher = Publisher::new(); +/// +/// let sub = publisher.subscribe().await; +/// +/// publisher.notify(&String::from("MESSAGE")).await; +/// +/// let msg = sub.recv().await; +/// +/// // .... +/// }; +/// +/// ``` +pub struct Publisher { + subs: Mutex>>, +} + +impl Publisher { + /// Creates a new Publisher + pub fn new() -> ArcPublisher { + Arc::new(Self { + subs: Mutex::new(HashMap::new()), + }) + } + + /// Subscribe and return a Subscription + pub async fn subscribe(self: &Arc) -> Subscription { + let mut subs = self.subs.lock().await; + + let chan = smol::channel::unbounded(); + + let mut sub_id = random_16(); + + // While the SubscriptionID already exists, generate a new one + while subs.contains_key(&sub_id) { + sub_id = random_16(); + } + + let sub = Subscription::new(sub_id, self.clone(), chan.1); + subs.insert(sub_id, chan.0); + + sub + } + + /// Unsubscribe from the Publisher + pub async fn unsubscribe(self: &Arc, id: &SubscriptionID) { + self.subs.lock().await.remove(id); + } + + /// Notify all subscribers + pub async fn notify(self: &Arc, value: &T) { + let mut subs = self.subs.lock().await; + let mut closed_subs = vec![]; + + for (sub_id, sub) in subs.iter() { + if let Err(err) = sub.send(value.clone()).await { + error!("failed to notify {}: {}", sub_id, err); + closed_subs.push(*sub_id); + } + } + + for sub_id in closed_subs.iter() { + subs.remove(sub_id); + } + } +} + +// Subscription +pub struct Subscription { + id: SubscriptionID, + recv_chan: smol::channel::Receiver, + publisher: ArcPublisher, +} + +impl Subscription { + /// Creates a new Subscription + pub fn new( + id: SubscriptionID, + publisher: ArcPublisher, + recv_chan: smol::channel::Receiver, + ) -> Subscription { + Self { + id, + recv_chan, + publisher, + } + } + + /// Receive a message from the Publisher + pub async fn recv(&self) -> Result { + let msg = self.recv_chan.recv().await?; + Ok(msg) + } + + /// Unsubscribe from the Publisher + pub async fn unsubscribe(&self) { + self.publisher.unsubscribe(&self.id).await; + } +} -- cgit v1.2.3