aboutsummaryrefslogtreecommitdiff
path: root/karyons_core/src/pubsub.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-09 11:38:19 +0300
committerhozan23 <hozan23@proton.me>2023-11-09 11:38:19 +0300
commit849d827486c75b2ab223d7b0e638dbb5b74d4d1d (patch)
tree41cd3babc37147ec4a40cab8ce8ae31c91cce33b /karyons_core/src/pubsub.rs
parentde1354525895ffbad18f90a5246fd65157f7449e (diff)
rename crates
Diffstat (limited to 'karyons_core/src/pubsub.rs')
-rw-r--r--karyons_core/src/pubsub.rs115
1 files changed, 0 insertions, 115 deletions
diff --git a/karyons_core/src/pubsub.rs b/karyons_core/src/pubsub.rs
deleted file mode 100644
index 4cc0ab7..0000000
--- a/karyons_core/src/pubsub.rs
+++ /dev/null
@@ -1,115 +0,0 @@
-use std::{collections::HashMap, sync::Arc};
-
-use log::error;
-use smol::lock::Mutex;
-
-use crate::{utils::random_16, Result};
-
-pub type ArcPublisher<T> = Arc<Publisher<T>>;
-pub type SubscriptionID = u16;
-
-/// A simple publish-subscribe system.
-// # Example
-///
-/// ```
-/// use karyons_core::pubsub::{Publisher};
-///
-/// async {
-/// let publisher = Publisher::new();
-///
-/// let sub = publisher.subscribe().await;
-///
-/// publisher.notify(&String::from("MESSAGE")).await;
-///
-/// let msg = sub.recv().await;
-///
-/// // ....
-/// };
-///
-/// ```
-pub struct Publisher<T> {
- subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>,
-}
-
-impl<T: Clone> Publisher<T> {
- /// Creates a new Publisher
- pub fn new() -> ArcPublisher<T> {
- Arc::new(Self {
- subs: Mutex::new(HashMap::new()),
- })
- }
-
- /// Subscribe and return a Subscription
- pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> {
- let mut subs = self.subs.lock().await;
-
- let chan = smol::channel::unbounded();
-
- let mut sub_id = random_16();
-
- // While the SubscriptionID already exists, generate a new one
- while subs.contains_key(&sub_id) {
- sub_id = random_16();
- }
-
- let sub = Subscription::new(sub_id, self.clone(), chan.1);
- subs.insert(sub_id, chan.0);
-
- sub
- }
-
- /// Unsubscribe from the Publisher
- pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
- self.subs.lock().await.remove(id);
- }
-
- /// Notify all subscribers
- pub async fn notify(self: &Arc<Self>, value: &T) {
- let mut subs = self.subs.lock().await;
- let mut closed_subs = vec![];
-
- for (sub_id, sub) in subs.iter() {
- if let Err(err) = sub.send(value.clone()).await {
- error!("failed to notify {}: {}", sub_id, err);
- closed_subs.push(*sub_id);
- }
- }
-
- for sub_id in closed_subs.iter() {
- subs.remove(sub_id);
- }
- }
-}
-
-// Subscription
-pub struct Subscription<T> {
- id: SubscriptionID,
- recv_chan: smol::channel::Receiver<T>,
- publisher: ArcPublisher<T>,
-}
-
-impl<T: Clone> Subscription<T> {
- /// Creates a new Subscription
- pub fn new(
- id: SubscriptionID,
- publisher: ArcPublisher<T>,
- recv_chan: smol::channel::Receiver<T>,
- ) -> Subscription<T> {
- Self {
- id,
- recv_chan,
- publisher,
- }
- }
-
- /// Receive a message from the Publisher
- pub async fn recv(&self) -> Result<T> {
- let msg = self.recv_chan.recv().await?;
- Ok(msg)
- }
-
- /// Unsubscribe from the Publisher
- pub async fn unsubscribe(&self) {
- self.publisher.unsubscribe(&self.id).await;
- }
-}