1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
use std::{collections::HashMap, sync::Arc};
use log::error;
use smol::lock::Mutex;
use crate::{util::random_16, Result};
pub type ArcPublisher<T> = Arc<Publisher<T>>;
pub type SubscriptionID = u16;
/// A simple publish-subscribe system.
// # Example
///
/// ```
/// use karyon_core::pubsub::{Publisher};
///
/// async {
/// let publisher = Publisher::new();
///
/// let sub = publisher.subscribe().await;
///
/// publisher.notify(&String::from("MESSAGE")).await;
///
/// let msg = sub.recv().await;
///
/// // ....
/// };
///
/// ```
pub struct Publisher<T> {
subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>,
}
impl<T: Clone> Publisher<T> {
/// Creates a new Publisher
pub fn new() -> ArcPublisher<T> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
})
}
/// Subscribe and return a Subscription
pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> {
let mut subs = self.subs.lock().await;
let chan = smol::channel::unbounded();
let mut sub_id = random_16();
// While the SubscriptionID already exists, generate a new one
while subs.contains_key(&sub_id) {
sub_id = random_16();
}
let sub = Subscription::new(sub_id, self.clone(), chan.1);
subs.insert(sub_id, chan.0);
sub
}
/// Unsubscribe from the Publisher
pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
self.subs.lock().await.remove(id);
}
/// Notify all subscribers
pub async fn notify(self: &Arc<Self>, value: &T) {
let mut subs = self.subs.lock().await;
let mut closed_subs = vec![];
for (sub_id, sub) in subs.iter() {
if let Err(err) = sub.send(value.clone()).await {
error!("failed to notify {}: {}", sub_id, err);
closed_subs.push(*sub_id);
}
}
for sub_id in closed_subs.iter() {
subs.remove(sub_id);
}
}
}
// Subscription
pub struct Subscription<T> {
id: SubscriptionID,
recv_chan: smol::channel::Receiver<T>,
publisher: ArcPublisher<T>,
}
impl<T: Clone> Subscription<T> {
/// Creates a new Subscription
pub fn new(
id: SubscriptionID,
publisher: ArcPublisher<T>,
recv_chan: smol::channel::Receiver<T>,
) -> Subscription<T> {
Self {
id,
recv_chan,
publisher,
}
}
/// Receive a message from the Publisher
pub async fn recv(&self) -> Result<T> {
let msg = self.recv_chan.recv().await?;
Ok(msg)
}
/// Unsubscribe from the Publisher
pub async fn unsubscribe(&self) {
self.publisher.unsubscribe(&self.id).await;
}
}
|