aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/subscriptions.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/client/subscriptions.rs')
-rw-r--r--jsonrpc/src/client/subscriptions.rs69
1 files changed, 54 insertions, 15 deletions
diff --git a/jsonrpc/src/client/subscriptions.rs b/jsonrpc/src/client/subscriptions.rs
index 9c8a9f4..f3d8cb2 100644
--- a/jsonrpc/src/client/subscriptions.rs
+++ b/jsonrpc/src/client/subscriptions.rs
@@ -1,7 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use async_channel::{Receiver, Sender};
-use log::warn;
use serde_json::json;
use serde_json::Value;
@@ -12,37 +11,77 @@ use crate::{
Error, Result,
};
-/// Type alias for a subscription to receive notifications.
-///
-/// The receiver channel is returned by the `subscribe`
-pub type Subscription = Receiver<Value>;
+/// A subscription established when the client's subscribe to a method
+pub struct Subscription {
+ id: SubscriptionID,
+ rx: Receiver<Value>,
+ tx: Sender<Value>,
+}
+
+impl Subscription {
+ fn new(id: SubscriptionID, buffer_size: usize) -> Arc<Self> {
+ let (tx, rx) = async_channel::bounded(buffer_size);
+ Arc::new(Self { tx, id, rx })
+ }
+
+ pub async fn recv(&self) -> Result<Value> {
+ self.rx.recv().await.map_err(Error::from)
+ }
+
+ pub fn id(&self) -> SubscriptionID {
+ self.id
+ }
+
+ async fn notify(&self, val: Value) -> Result<()> {
+ if self.tx.is_full() {
+ return Err(Error::SubscriptionBufferFull);
+ }
+ self.tx.send(val).await?;
+ Ok(())
+ }
+
+ fn close(&self) {
+ self.tx.close();
+ }
+}
/// Manages subscriptions for the client.
pub(super) struct Subscriptions {
- subs: Mutex<HashMap<SubscriptionID, Sender<Value>>>,
+ subs: Mutex<HashMap<SubscriptionID, Arc<Subscription>>>,
+ sub_buffer_size: usize,
}
impl Subscriptions {
- pub(super) fn new() -> Arc<Self> {
+ /// Creates a new [`Subscriptions`].
+ pub(super) fn new(sub_buffer_size: usize) -> Arc<Self> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
+ sub_buffer_size,
})
}
- pub(super) async fn subscribe(self: &Arc<Self>, id: SubscriptionID) -> Subscription {
- let (ch_tx, ch_rx) = async_channel::unbounded();
- self.subs.lock().await.insert(id, ch_tx);
- ch_rx
+ /// Returns a new [`Subscription`]
+ pub(super) async fn subscribe(&self, id: SubscriptionID) -> Arc<Subscription> {
+ let sub = Subscription::new(id, self.sub_buffer_size);
+ self.subs.lock().await.insert(id, sub.clone());
+ sub
}
- pub(super) async fn drop_all(&self) {
- self.subs.lock().await.clear();
+ /// Closes subscription channels and clear the inner map.
+ pub(super) async fn clear(&self) {
+ let mut subs = self.subs.lock().await;
+ for (_, sub) in subs.iter() {
+ sub.close();
+ }
+ subs.clear();
}
+ /// Unsubscribe from the provided subscription id.
pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
self.subs.lock().await.remove(id);
}
+ /// Notifies the subscription about the given notification.
pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
let nt_res: NotificationResult = match nt.params {
Some(ref p) => serde_json::from_value(p.clone())?,
@@ -50,9 +89,9 @@ impl Subscriptions {
};
match self.subs.lock().await.get(&nt_res.subscription) {
- Some(s) => s.send(nt_res.result.unwrap_or(json!(""))).await?,
+ Some(s) => s.notify(nt_res.result.unwrap_or(json!(""))).await?,
None => {
- warn!("Receive unknown notification {}", nt_res.subscription)
+ return Err(Error::InvalidMsg("Unknown notification"));
}
}