From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- karyons_core/src/pubsub.rs | 115 --------------------------------------------- 1 file changed, 115 deletions(-) delete mode 100644 karyons_core/src/pubsub.rs (limited to 'karyons_core/src/pubsub.rs') diff --git a/karyons_core/src/pubsub.rs b/karyons_core/src/pubsub.rs deleted file mode 100644 index 4cc0ab7..0000000 --- a/karyons_core/src/pubsub.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use log::error; -use smol::lock::Mutex; - -use crate::{utils::random_16, Result}; - -pub type ArcPublisher = Arc>; -pub type SubscriptionID = u16; - -/// A simple publish-subscribe system. -// # Example -/// -/// ``` -/// use karyons_core::pubsub::{Publisher}; -/// -/// async { -/// let publisher = Publisher::new(); -/// -/// let sub = publisher.subscribe().await; -/// -/// publisher.notify(&String::from("MESSAGE")).await; -/// -/// let msg = sub.recv().await; -/// -/// // .... -/// }; -/// -/// ``` -pub struct Publisher { - subs: Mutex>>, -} - -impl Publisher { - /// Creates a new Publisher - pub fn new() -> ArcPublisher { - Arc::new(Self { - subs: Mutex::new(HashMap::new()), - }) - } - - /// Subscribe and return a Subscription - pub async fn subscribe(self: &Arc) -> Subscription { - let mut subs = self.subs.lock().await; - - let chan = smol::channel::unbounded(); - - let mut sub_id = random_16(); - - // While the SubscriptionID already exists, generate a new one - while subs.contains_key(&sub_id) { - sub_id = random_16(); - } - - let sub = Subscription::new(sub_id, self.clone(), chan.1); - subs.insert(sub_id, chan.0); - - sub - } - - /// Unsubscribe from the Publisher - pub async fn unsubscribe(self: &Arc, id: &SubscriptionID) { - self.subs.lock().await.remove(id); - } - - /// Notify all subscribers - pub async fn notify(self: &Arc, value: &T) { - let mut subs = self.subs.lock().await; - let mut closed_subs = vec![]; - - for (sub_id, sub) in subs.iter() { - if let Err(err) = sub.send(value.clone()).await { - error!("failed to notify {}: {}", sub_id, err); - closed_subs.push(*sub_id); - } - } - - for sub_id in closed_subs.iter() { - subs.remove(sub_id); - } - } -} - -// Subscription -pub struct Subscription { - id: SubscriptionID, - recv_chan: smol::channel::Receiver, - publisher: ArcPublisher, -} - -impl Subscription { - /// Creates a new Subscription - pub fn new( - id: SubscriptionID, - publisher: ArcPublisher, - recv_chan: smol::channel::Receiver, - ) -> Subscription { - Self { - id, - recv_chan, - publisher, - } - } - - /// Receive a message from the Publisher - pub async fn recv(&self) -> Result { - let msg = self.recv_chan.recv().await?; - Ok(msg) - } - - /// Unsubscribe from the Publisher - pub async fn unsubscribe(&self) { - self.publisher.unsubscribe(&self.id).await; - } -} -- cgit v1.2.3