aboutsummaryrefslogtreecommitdiff
path: root/core/src/pubsub.rs
blob: bcc24efe8d60d34c188e630476b7834141f40a72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::{collections::HashMap, sync::Arc};

use log::error;

use crate::{async_runtime::lock::Mutex, util::random_16, Result};

pub type ArcPublisher<T> = Arc<Publisher<T>>;
pub type SubscriptionID = u16;

/// A simple publish-subscribe system.
// # Example
///
/// ```
/// use karyon_core::pubsub::{Publisher};
///
///  async {
///     let publisher = Publisher::new();
///     
///     let sub = publisher.subscribe().await;
///     
///     publisher.notify(&String::from("MESSAGE")).await;
///
///     let msg = sub.recv().await;
///
///     // ....
///  };
///  
/// ```
pub struct Publisher<T> {
    subs: Mutex<HashMap<SubscriptionID, async_channel::Sender<T>>>,
}

impl<T: Clone> Publisher<T> {
    /// Creates a new Publisher
    pub fn new() -> ArcPublisher<T> {
        Arc::new(Self {
            subs: Mutex::new(HashMap::new()),
        })
    }

    /// Subscribe and return a Subscription
    pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> {
        let mut subs = self.subs.lock().await;

        let chan = async_channel::unbounded();

        let mut sub_id = random_16();

        // While the SubscriptionID already exists, generate a new one
        while subs.contains_key(&sub_id) {
            sub_id = random_16();
        }

        let sub = Subscription::new(sub_id, self.clone(), chan.1);
        subs.insert(sub_id, chan.0);

        sub
    }

    /// Unsubscribe from the Publisher
    pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
        self.subs.lock().await.remove(id);
    }

    /// Notify all subscribers
    pub async fn notify(self: &Arc<Self>, value: &T) {
        let mut subs = self.subs.lock().await;
        let mut closed_subs = vec![];

        for (sub_id, sub) in subs.iter() {
            if let Err(err) = sub.send(value.clone()).await {
                error!("failed to notify {}: {}", sub_id, err);
                closed_subs.push(*sub_id);
            }
        }

        for sub_id in closed_subs.iter() {
            subs.remove(sub_id);
        }
    }
}

// Subscription
pub struct Subscription<T> {
    id: SubscriptionID,
    recv_chan: async_channel::Receiver<T>,
    publisher: ArcPublisher<T>,
}

impl<T: Clone> Subscription<T> {
    /// Creates a new Subscription
    pub fn new(
        id: SubscriptionID,
        publisher: ArcPublisher<T>,
        recv_chan: async_channel::Receiver<T>,
    ) -> Subscription<T> {
        Self {
            id,
            recv_chan,
            publisher,
        }
    }

    /// Receive a message from the Publisher
    pub async fn recv(&self) -> Result<T> {
        let msg = self.recv_chan.recv().await?;
        Ok(msg)
    }

    /// Unsubscribe from the Publisher
    pub async fn unsubscribe(&self) {
        self.publisher.unsubscribe(&self.id).await;
    }
}