From 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 13 Jun 2024 05:52:48 +0200 Subject: jsonrpc: spread out comments and clean up --- jsonrpc/src/client/subscriber.rs | 64 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 jsonrpc/src/client/subscriber.rs (limited to 'jsonrpc/src/client/subscriber.rs') diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs new file mode 100644 index 0000000..d47cc2a --- /dev/null +++ b/jsonrpc/src/client/subscriber.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use async_channel::{Receiver, Sender}; +use log::warn; +use serde_json::json; + +use karyon_core::async_runtime::lock::Mutex; + +use crate::{ + message::{Notification, NotificationResult, SubscriptionID}, + Error, Result, +}; + +const CHANNEL_CAP: usize = 10; + +/// Manages subscriptions for the client. +pub(super) struct Subscriber { + subs: Mutex>>, +} + +/// Type alias for a subscription to receive notifications. +/// +/// The receiver channel is returned by the `subscribe` +pub type Subscription = Receiver; + +impl Subscriber { + pub(super) fn new() -> Self { + Self { + subs: Mutex::new(HashMap::new()), + } + } + + pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver { + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + self.subs.lock().await.insert(id, ch_tx); + ch_rx + } + + pub(super) async fn drop_all(&self) { + self.subs.lock().await.clear(); + } + + /// Unsubscribe + pub(super) async fn unsubscribe(&self, id: &SubscriptionID) { + self.subs.lock().await.remove(id); + } + + pub(super) async fn notify(&self, nt: Notification) -> Result<()> { + let nt_res: NotificationResult = match nt.params { + Some(ref p) => serde_json::from_value(p.clone())?, + None => return Err(Error::InvalidMsg("Invalid notification msg")), + }; + match self.subs.lock().await.get(&nt_res.subscription) { + Some(s) => { + s.send(nt_res.result.unwrap_or(json!(""))).await?; + Ok(()) + } + None => { + warn!("Receive unknown notification {}", nt_res.subscription); + Ok(()) + } + } + } +} -- cgit v1.2.3