aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/subscriber.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-17 16:17:17 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-17 16:17:17 +0200
commit72accd61fad0eea312d868b283c6b26da4802ff8 (patch)
treea1b8e0df25df3ea6bc6be5b1fe6ebe1f282150c4 /jsonrpc/src/client/subscriber.rs
parent2d2925c3e21af8ee8f745aa00c0a59dcd9c95df9 (diff)
jsonrpc/client: use serde untagged enum for decoding Notifications and Responses
Diffstat (limited to 'jsonrpc/src/client/subscriber.rs')
-rw-r--r--jsonrpc/src/client/subscriber.rs62
1 files changed, 0 insertions, 62 deletions
diff --git a/jsonrpc/src/client/subscriber.rs b/jsonrpc/src/client/subscriber.rs
deleted file mode 100644
index 168f16e..0000000
--- a/jsonrpc/src/client/subscriber.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-use std::collections::HashMap;
-
-use async_channel::{Receiver, Sender};
-use log::warn;
-use serde_json::json;
-
-use karyon_core::async_runtime::lock::Mutex;
-
-use crate::{
- message::{Notification, NotificationResult, SubscriptionID},
- Error, Result,
-};
-
-/// Manages subscriptions for the client.
-pub(super) struct Subscriber {
- subs: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
-}
-
-/// Type alias for a subscription to receive notifications.
-///
-/// The receiver channel is returned by the `subscribe`
-pub type Subscription = Receiver<serde_json::Value>;
-
-impl Subscriber {
- pub(super) fn new() -> Self {
- Self {
- subs: Mutex::new(HashMap::new()),
- }
- }
-
- pub(super) async fn subscribe(&self, id: SubscriptionID) -> Receiver<serde_json::Value> {
- let (ch_tx, ch_rx) = async_channel::unbounded();
- self.subs.lock().await.insert(id, ch_tx);
- ch_rx
- }
-
- pub(super) async fn drop_all(&self) {
- self.subs.lock().await.clear();
- }
-
- /// Unsubscribe
- pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
- self.subs.lock().await.remove(id);
- }
-
- pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
- let nt_res: NotificationResult = match nt.params {
- Some(ref p) => serde_json::from_value(p.clone())?,
- None => return Err(Error::InvalidMsg("Invalid notification msg")),
- };
- match self.subs.lock().await.get(&nt_res.subscription) {
- Some(s) => {
- s.send(nt_res.result.unwrap_or(json!(""))).await?;
- Ok(())
- }
- None => {
- warn!("Receive unknown notification {}", nt_res.subscription);
- Ok(())
- }
- }
- }
-}