aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/subscriber.rs
blob: 168f16eb05798f0bf4b8d67580b1670dbd57aae3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use std::collections::HashMap;

use async_channel::{Receiver, Sender};
use log::warn;
use serde_json::json;

use karyon_core::async_runtime::lock::Mutex;

use crate::{
    message::{Notification, NotificationResult, SubscriptionID},
    Error, Result,
};

/// Manages subscriptions for the client.
pub(super) struct Subscriber {
    subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
}

/// Type alias for a subscription to receive notifications.
///
/// The receiver channel is returned by the `subscribe`
pub type Subscription = Receiver<serde_json::Value>;

impl Subscriber {
    pub(super) fn new() -> Self {
        Self {
            subs: Mutex::new(HashMap::new()),
        }
    }

    pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
        let (ch_tx, ch_rx) = async_channel::unbounded();
        self.subs.lock().await.insert(id, ch_tx);
        ch_rx
    }

    pub(super) async fn drop_all(&self) {
        self.subs.lock().await.clear();
    }

    /// Unsubscribe
    pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
        self.subs.lock().await.remove(id);
    }

    pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
        let nt_res: NotificationResult = match nt.params {
            Some(ref p) => serde_json::from_value(p.clone())?,
            None => return Err(Error::InvalidMsg("Invalid notification msg")),
        };
        match self.subs.lock().await.get(&nt_res.subscription) {
            Some(s) => {
                s.send(nt_res.result.unwrap_or(json!(""))).await?;
                Ok(())
            }
            None => {
                warn!("Receive unknown notification {}", nt_res.subscription);
                Ok(())
            }
        }
    }
}