aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server/channel.rs')
-rw-r--r--jsonrpc/src/server/channel.rs39
1 files changed, 29 insertions, 10 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 1498825..f14c1dd 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -7,11 +7,18 @@ use crate::{Error, Result};
pub type SubscriptionID = u32;
pub type ArcChannel = Arc<Channel>;
+pub(crate) struct NewNotification {
+ pub sub_id: SubscriptionID,
+ pub result: serde_json::Value,
+ pub method: String,
+}
+
/// Represents a new subscription
pub struct Subscription {
pub id: SubscriptionID,
parent: Arc<Channel>,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: String,
}
impl Subscription {
@@ -19,15 +26,26 @@ impl Subscription {
fn new(
parent: Arc<Channel>,
id: SubscriptionID,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: &str,
) -> Self {
- Self { parent, id, chan }
+ Self {
+ parent,
+ id,
+ chan,
+ method: method.to_string(),
+ }
}
/// Sends a notification to the subscriber
pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
if self.parent.subs.lock().await.contains(&self.id) {
- self.chan.send((self.id, res)).await?;
+ let nt = NewNotification {
+ sub_id: self.id,
+ result: res,
+ method: self.method.clone(),
+ };
+ self.chan.send(nt).await?;
Ok(())
} else {
Err(Error::SubscriptionNotFound(self.id.to_string()))
@@ -37,13 +55,13 @@ impl Subscription {
/// Represents a channel for creating/removing subscriptions
pub struct Channel {
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
subs: Mutex<Vec<SubscriptionID>>,
}
impl Channel {
/// Creates a new `Channel`
- pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel {
+ pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
Arc::new(Self {
chan,
subs: Mutex::new(Vec::new()),
@@ -51,19 +69,20 @@ impl Channel {
}
/// Creates a new subscription
- pub async fn new_subscription(self: &Arc<Self>) -> Subscription {
+ pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {
let sub_id = random_32();
- let sub = Subscription::new(self.clone(), sub_id, self.chan.clone());
+ let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method);
self.subs.lock().await.push(sub_id);
sub
}
/// Removes a subscription
pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) {
- let i = match self.subs.lock().await.iter().position(|i| i == id) {
+ let mut subs = self.subs.lock().await;
+ let i = match subs.iter().position(|i| i == id) {
Some(i) => i,
None => return,
};
- self.subs.lock().await.remove(i);
+ subs.remove(i);
}
}