diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-17 16:17:17 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-17 16:17:17 +0200 |
commit | 72accd61fad0eea312d868b283c6b26da4802ff8 (patch) | |
tree | a1b8e0df25df3ea6bc6be5b1fe6ebe1f282150c4 /jsonrpc/src/client/subscriber.rs | |
parent | 2d2925c3e21af8ee8f745aa00c0a59dcd9c95df9 (diff) |
jsonrpc/client: use serde untagged enum for decoding Notifications and Responses
Diffstat (limited to 'jsonrpc/src/client/subscriber.rs')
-rw-r--r-- | jsonrpc/src/client/subscriber.rs | 62 |
1 files changed, 0 insertions, 62 deletions
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs deleted file mode 100644 index 168f16e..0000000 --- a/jsonrpc/src/client/subscriber.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::collections::HashMap; - -use async_channel::{Receiver, Sender}; -use log::warn; -use serde_json::json; - -use karyon_core::async_runtime::lock::Mutex; - -use crate::{ - message::{Notification, NotificationResult, SubscriptionID}, - Error, Result, -}; - -/// Manages subscriptions for the client. -pub(super) struct Subscriber { - subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>, -} - -/// Type alias for a subscription to receive notifications. -/// -/// The receiver channel is returned by the `subscribe` -pub type Subscription = Receiver<serde_json::Value>; - -impl Subscriber { - pub(super) fn new() -> Self { - Self { - subs: Mutex::new(HashMap::new()), - } - } - - pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> { - let (ch_tx, ch_rx) = async_channel::unbounded(); - self.subs.lock().await.insert(id, ch_tx); - ch_rx - } - - pub(super) async fn drop_all(&self) { - self.subs.lock().await.clear(); - } - - /// Unsubscribe - pub(super) async fn unsubscribe(&self, id: &SubscriptionID) { - self.subs.lock().await.remove(id); - } - - pub(super) async fn notify(&self, nt: Notification) -> Result<()> { - let nt_res: NotificationResult = match nt.params { - Some(ref p) => serde_json::from_value(p.clone())?, - None => return Err(Error::InvalidMsg("Invalid notification msg")), - }; - match self.subs.lock().await.get(&nt_res.subscription) { - Some(s) => { - s.send(nt_res.result.unwrap_or(json!(""))).await?; - Ok(()) - } - None => { - warn!("Receive unknown notification {}", nt_res.subscription); - Ok(()) - } - } - } -} |