aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/subscriptions.rs
blob: 3583b334d633803d27879029039472823f785829 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{collections::HashMap, sync::Arc};

use async_channel::{Receiver, Sender};
use serde_json::json;
use serde_json::Value;

use karyon_core::async_runtime::lock::Mutex;

use crate::{
    message::{Notification, NotificationResult, SubscriptionID},
    Error, Result,
};

/// A subscription established when the client's subscribe to a method
pub struct Subscription {
    id: SubscriptionID,
    rx: Receiver<Value>,
    tx: Sender<Value>,
}

impl Subscription {
    fn new(id: SubscriptionID, buffer_size: usize) -> Arc<Self> {
        let (tx, rx) = async_channel::bounded(buffer_size);
        Arc::new(Self { tx, id, rx })
    }

    pub async fn recv(&self) -> Result<Value> {
        self.rx.recv().await.map_err(|_| Error::SubscriptionClosed)
    }

    pub fn id(&self) -> SubscriptionID {
        self.id
    }

    async fn notify(&self, val: Value) -> Result<()> {
        if self.tx.is_full() {
            return Err(Error::SubscriptionBufferFull);
        }
        self.tx.send(val).await?;
        Ok(())
    }

    fn close(&self) {
        self.tx.close();
    }
}

/// Manages subscriptions for the client.
pub(super) struct Subscriptions {
    subs: Mutex<HashMap<SubscriptionID, Arc<Subscription>>>,
    sub_buffer_size: usize,
}

impl Subscriptions {
    /// Creates a new [`Subscriptions`].
    pub(super) fn new(sub_buffer_size: usize) -> Arc<Self> {
        Arc::new(Self {
            subs: Mutex::new(HashMap::new()),
            sub_buffer_size,
        })
    }

    /// Returns a new [`Subscription`]
    pub(super) async fn subscribe(&self, id: SubscriptionID) -> Arc<Subscription> {
        let sub = Subscription::new(id, self.sub_buffer_size);
        self.subs.lock().await.insert(id, sub.clone());
        sub
    }

    /// Closes subscription channels and clear the inner map.
    pub(super) async fn clear(&self) {
        let mut subs = self.subs.lock().await;
        for (_, sub) in subs.iter() {
            sub.close();
        }
        subs.clear();
    }

    /// Unsubscribe from the provided subscription id.
    pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
        if let Some(sub) = self.subs.lock().await.remove(id) {
            sub.close();
        }
    }

    /// Notifies the subscription about the given notification.
    pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
        let nt_res: NotificationResult = match nt.params {
            Some(ref p) => serde_json::from_value(p.clone())?,
            None => return Err(Error::InvalidMsg("Invalid notification msg")),
        };

        match self.subs.lock().await.get(&nt_res.subscription) {
            Some(s) => s.notify(nt_res.result.unwrap_or(json!(""))).await?,
            None => {
                return Err(Error::InvalidMsg("Unknown notification"));
            }
        }

        Ok(())
    }
}