diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-17 16:17:17 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-17 16:17:17 +0200 |
commit | 72accd61fad0eea312d868b283c6b26da4802ff8 (patch) | |
tree | a1b8e0df25df3ea6bc6be5b1fe6ebe1f282150c4 /jsonrpc/src/client/subscriptions.rs | |
parent | 2d2925c3e21af8ee8f745aa00c0a59dcd9c95df9 (diff) |
jsonrpc/client: use serde untagged enum for decoding Notifications and Responses
Diffstat (limited to 'jsonrpc/src/client/subscriptions.rs')
-rw-r--r-- | jsonrpc/src/client/subscriptions.rs | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/jsonrpc/src/client/subscriptions.rs b/jsonrpc/src/client/subscriptions.rs new file mode 100644 index 0000000..9c8a9f4 --- /dev/null +++ b/jsonrpc/src/client/subscriptions.rs @@ -0,0 +1,61 @@ +use std::{collections::HashMap, sync::Arc}; + +use async_channel::{Receiver, Sender}; +use log::warn; +use serde_json::json; +use serde_json::Value; + +use karyon_core::async_runtime::lock::Mutex; + +use crate::{ + message::{Notification, NotificationResult, SubscriptionID}, + Error, Result, +}; + +/// Type alias for a subscription to receive notifications. +/// +/// The receiver channel is returned by the `subscribe` +pub type Subscription = Receiver<Value>; + +/// Manages subscriptions for the client. +pub(super) struct Subscriptions { + subs: Mutex<HashMap<SubscriptionID, Sender<Value>>>, +} + +impl Subscriptions { + pub(super) fn new() -> Arc<Self> { + Arc::new(Self { + subs: Mutex::new(HashMap::new()), + }) + } + + pub(super) async fn subscribe(self: &Arc<Self>, id: SubscriptionID) -> Subscription { + let (ch_tx, ch_rx) = async_channel::unbounded(); + self.subs.lock().await.insert(id, ch_tx); + ch_rx + } + + pub(super) async fn drop_all(&self) { + self.subs.lock().await.clear(); + } + + pub(super) async fn unsubscribe(&self, id: &SubscriptionID) { + self.subs.lock().await.remove(id); + } + + pub(super) async fn notify(&self, nt: Notification) -> Result<()> { + let nt_res: NotificationResult = match nt.params { + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), + }; + + match self.subs.lock().await.get(&nt_res.subscription) { + Some(s) => s.send(nt_res.result.unwrap_or(json!(""))).await?, + None => { + warn!("Receive unknown notification {}", nt_res.subscription) + } + } + + Ok(()) + } +} |