aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/subscriber.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-13 05:52:48 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-13 05:52:48 +0200
commit1c27f751c30196e2c421ae420dacbc4ff25f0fc7 (patch)
treee9a34bea9e6fd45d53a4ad1a7a4e75857ad2fe9a /jsonrpc/src/client/subscriber.rs
parentd6a280f69a6685d5b4da5366626fb76a27f0cc07 (diff)
jsonrpc: spread out comments and clean up
Diffstat (limited to 'jsonrpc/src/client/subscriber.rs')
-rw-r--r--jsonrpc/src/client/subscriber.rs64
1 files changed, 64 insertions, 0 deletions
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
new file mode 100644
index 0000000..d47cc2a
--- /dev/null
+++ b/jsonrpc/src/client/subscriber.rs
@@ -0,0 +1,64 @@
+use std::collections::HashMap;
+
+use async_channel::{Receiver, Sender};
+use log::warn;
+use serde_json::json;
+
+use karyon_core::async_runtime::lock::Mutex;
+
+use crate::{
+ message::{Notification, NotificationResult, SubscriptionID},
+ Error, Result,
+};
+
+const CHANNEL_CAP: usize = 10;
+
+/// Manages subscriptions for the client.
+pub(super) struct Subscriber {
+ subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
+}
+
+/// Type alias for a subscription to receive notifications.
+///
+/// The receiver channel is returned by the `subscribe`
+pub type Subscription = Receiver<serde_json::Value>;
+
+impl Subscriber {
+ pub(super) fn new() -> Self {
+ Self {
+ subs: Mutex::new(HashMap::new()),
+ }
+ }
+
+ pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ self.subs.lock().await.insert(id, ch_tx);
+ ch_rx
+ }
+
+ pub(super) async fn drop_all(&self) {
+ self.subs.lock().await.clear();
+ }
+
+ /// Unsubscribe
+ pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
+ self.subs.lock().await.remove(id);
+ }
+
+ pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
+ let nt_res: NotificationResult = match nt.params {
+ Some(ref p) => serde_json::from_value(p.clone())?,
+ None => return Err(Error::InvalidMsg("Invalid notification msg")),
+ };
+ match self.subs.lock().await.get(&nt_res.subscription) {
+ Some(s) => {
+ s.send(nt_res.result.unwrap_or(json!(""))).await?;
+ Ok(())
+ }
+ None => {
+ warn!("Receive unknown notification {}", nt_res.subscription);
+ Ok(())
+ }
+ }
+ }
+}