diff options
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r-- | jsonrpc/src/server/channel.rs | 39 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 124 | ||||
-rw-r--r-- | jsonrpc/src/server/pubsub_service.rs | 6 |
3 files changed, 121 insertions, 48 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 1498825..f14c1dd 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -7,11 +7,18 @@ use crate::{Error, Result}; pub type SubscriptionID = u32; pub type ArcChannel = Arc<Channel>; +pub(crate) struct NewNotification { + pub sub_id: SubscriptionID, + pub result: serde_json::Value, + pub method: String, +} + /// Represents a new subscription pub struct Subscription { pub id: SubscriptionID, parent: Arc<Channel>, - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender<NewNotification>, + method: String, } impl Subscription { @@ -19,15 +26,26 @@ impl Subscription { fn new( parent: Arc<Channel>, id: SubscriptionID, - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender<NewNotification>, + method: &str, ) -> Self { - Self { parent, id, chan } + Self { + parent, + id, + chan, + method: method.to_string(), + } } /// Sends a notification to the subscriber pub async fn notify(&self, res: serde_json::Value) -> Result<()> { if self.parent.subs.lock().await.contains(&self.id) { - self.chan.send((self.id, res)).await?; + let nt = NewNotification { + sub_id: self.id, + result: res, + method: self.method.clone(), + }; + self.chan.send(nt).await?; Ok(()) } else { Err(Error::SubscriptionNotFound(self.id.to_string())) @@ -37,13 +55,13 @@ impl Subscription { /// Represents a channel for creating/removing subscriptions pub struct Channel { - chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>, + chan: async_channel::Sender<NewNotification>, subs: Mutex<Vec<SubscriptionID>>, } impl Channel { /// Creates a new `Channel` - pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel { + pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel { Arc::new(Self { chan, subs: Mutex::new(Vec::new()), @@ -51,19 +69,20 @@ impl Channel { } /// Creates a new subscription - pub async fn new_subscription(self: &Arc<Self>) -> Subscription { + pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription { let sub_id = random_32(); - let sub = Subscription::new(self.clone(), sub_id, self.chan.clone()); + let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method); self.subs.lock().await.push(sub_id); sub } /// Removes a subscription pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) { - let i = match self.subs.lock().await.iter().position(|i| i == id) { + let mut subs = self.subs.lock().await; + let i = match subs.iter().position(|i| i == id) { Some(i) => i, None => return, }; - self.subs.lock().await.remove(i); + subs.remove(i); } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 4ebab10..29b1a10 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -47,7 +47,6 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message: error: Some(err), result: None, id, - subscription: None, } } @@ -77,19 +76,31 @@ impl Server { } /// Starts the RPC server - pub async fn start(self: Arc<Self>) -> Result<()> { - loop { - match self.listener.accept().await { - Ok(conn) => { - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") + pub async fn start(self: &Arc<Self>) { + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Accept loop stopped: {err}"); + } + }; + + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + match selfc.listener.accept().await { + Ok(conn) => { + if let Err(err) = selfc.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } } } - Err(err) => { - error!("Failed to accept a new conn: {err}") - } - } - } + }, + on_failure, + ); } /// Shuts down the RPC server @@ -102,6 +113,40 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); + // TODO Avoid depending on channels + let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP); + + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let channel = Channel::new(ch_tx); + + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + debug!("Notification loop stopped: {err}"); + } + }; + + let selfc = self.clone(); + let txc = tx.clone(); + self.task_group.spawn( + async move { + loop { + let nt = ch_rx.recv().await?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let response = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + debug!("--> {response}"); + txc.send(serde_json::to_value(response)?).await?; + } + }, + on_failure, + ); + let on_failure = |result: TaskResult<Result<()>>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); @@ -110,30 +155,14 @@ impl Server { } }; - let selfc = self.clone(); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); - let channel = Channel::new(ch_tx); self.task_group.spawn( async move { loop { - match select(conn.recv(), ch_rx.recv()).await { + match select(conn.recv(), rx.recv()).await { Either::Left(msg) => { - // TODO spawn a task - let response = selfc.handle_request(channel.clone(), msg?).await; - debug!("--> {response}"); - conn.send(serde_json::to_value(response)?).await?; - } - Either::Right(msg) => { - let (sub_id, result) = msg?; - let response = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: None, - params: Some(result), - subscription: Some(sub_id.into()), - }; - debug!("--> {response}"); - conn.send(serde_json::to_value(response)?).await?; + selfc.new_request(tx.clone(), channel.clone(), msg?).await; } + Either::Right(msg) => conn.send(msg?).await?, } } }, @@ -176,8 +205,32 @@ impl Server { }) } + /// Spawns a new task for handling a new request + async fn new_request( + self: &Arc<Self>, + sender: async_channel::Sender<serde_json::Value>, + channel: ArcChannel, + msg: serde_json::Value, + ) { + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Failed to handle a request: {err}"); + } + }; + let selfc = self.clone(); + self.task_group.spawn( + async move { + let response = selfc._handle_request(channel, msg).await; + debug!("--> {response}"); + sender.send(serde_json::json!(response)).await?; + Ok(()) + }, + on_failure, + ); + } + /// Handles a new request - async fn handle_request( + async fn _handle_request( &self, channel: ArcChannel, msg: serde_json::Value, @@ -239,7 +292,6 @@ impl Server { error: None, result: Some(result), id: Some(rpc_msg.id), - subscription: None, } } @@ -262,7 +314,8 @@ impl Server { } }; - let result = match method(channel, rpc_msg.params.clone()).await { + let name = format!("{}.{}", service.name(), method_name); + let result = match method(channel, name, rpc_msg.params.clone()).await { Ok(res) => res, Err(err) => return self.handle_error(err, rpc_msg.id), }; @@ -270,9 +323,8 @@ impl Server { message::Response { jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, - result: None, + result: Some(result), id: Some(rpc_msg.id), - subscription: Some(result), } } diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index 5b4bf9a..5b3b50b 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -6,7 +6,7 @@ use super::channel::ArcChannel; /// Represents the RPC method pub type PubSubRPCMethod<'a> = - Box<dyn Fn(ArcChannel, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>; + Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>; type PubSubRPCMethodOutput<'a> = Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>; @@ -51,7 +51,9 @@ macro_rules! impl_pubsub_rpc_service { match name { $( stringify!($m) => { - Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, params: serde_json::Value| Box::pin(self.$m(chan, params)))) + Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| { + Box::pin(self.$m(chan, method, params)) + })) } )* _ => None, |