From 2d1a8aea0b9330cd2eaad26eb187644adad6bed9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 00:19:58 +0200 Subject: jsonrpc: spawn task when handle new request --- jsonrpc/src/server/channel.rs | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) (limited to 'jsonrpc/src/server/channel.rs') diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 1498825..f14c1dd 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -7,11 +7,18 @@ use crate::{Error, Result}; pub type SubscriptionID = u32; pub type ArcChannel = Arc; +pub(crate) struct NewNotification { + pub sub_id: SubscriptionID, + pub result: serde_json::Value, + pub method: String, +} + /// Represents a new subscription pub struct Subscription { pub id: SubscriptionID, parent: Arc, - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender, + method: String, } impl Subscription { @@ -19,15 +26,26 @@ impl Subscription { fn new( parent: Arc, id: SubscriptionID, - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender, + method: &str, ) -> Self { - Self { parent, id, chan } + Self { + parent, + id, + chan, + method: method.to_string(), + } } /// Sends a notification to the subscriber pub async fn notify(&self, res: serde_json::Value) -> Result<()> { if self.parent.subs.lock().await.contains(&self.id) { - self.chan.send((self.id, res)).await?; + let nt = NewNotification { + sub_id: self.id, + result: res, + method: self.method.clone(), + }; + self.chan.send(nt).await?; Ok(()) } else { Err(Error::SubscriptionNotFound(self.id.to_string())) @@ -37,13 +55,13 @@ impl Subscription { /// Represents a channel for creating/removing subscriptions pub struct Channel { - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender, subs: Mutex>, } impl Channel { /// Creates a new `Channel` - pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel { + pub(crate) fn new(chan: async_channel::Sender) -> ArcChannel { Arc::new(Self { chan, subs: Mutex::new(Vec::new()), @@ -51,19 +69,20 @@ impl Channel { } /// Creates a new subscription - pub async fn new_subscription(self: &Arc) -> Subscription { + pub async fn new_subscription(self: &Arc, method: &str) -> Subscription { let sub_id = random_32(); - let sub = Subscription::new(self.clone(), sub_id, self.chan.clone()); + let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method); self.subs.lock().await.push(sub_id); sub } /// Removes a subscription pub async fn remove_subscription(self: &Arc, id: &SubscriptionID) { - let i = match self.subs.lock().await.iter().position(|i| i == id) { + let mut subs = self.subs.lock().await; + let i = match subs.iter().position(|i| i == id) { Some(i) => i, None => return, }; - self.subs.lock().await.remove(i); + subs.remove(i); } } -- cgit v1.2.3