diff options
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
-rw-r--r-- | jsonrpc/src/server/mod.rs | 124 |
1 files changed, 88 insertions, 36 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 4ebab10..29b1a10 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -47,7 +47,6 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message: error: Some(err), result: None, id, - subscription: None, } } @@ -77,19 +76,31 @@ impl Server { } /// Starts the RPC server - pub async fn start(self: Arc<Self>) -> Result<()> { - loop { - match self.listener.accept().await { - Ok(conn) => { - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") + pub async fn start(self: &Arc<Self>) { + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Accept loop stopped: {err}"); + } + }; + + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + match selfc.listener.accept().await { + Ok(conn) => { + if let Err(err) = selfc.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } } } - Err(err) => { - error!("Failed to accept a new conn: {err}") - } - } - } + }, + on_failure, + ); } /// Shuts down the RPC server @@ -102,6 +113,40 @@ impl Server { let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); + // TODO Avoid depending on channels + let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP); + + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); + let channel = Channel::new(ch_tx); + + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + debug!("Notification loop stopped: {err}"); + } + }; + + let selfc = self.clone(); + let txc = tx.clone(); + self.task_group.spawn( + async move { + loop { + let nt = ch_rx.recv().await?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let response = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + debug!("--> {response}"); + txc.send(serde_json::to_value(response)?).await?; + } + }, + on_failure, + ); + let on_failure = |result: TaskResult<Result<()>>| async move { if let TaskResult::Completed(Err(err)) = result { error!("Connection {} dropped: {}", endpoint, err); @@ -110,30 +155,14 @@ impl Server { } }; - let selfc = self.clone(); - let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP); - let channel = Channel::new(ch_tx); self.task_group.spawn( async move { loop { - match select(conn.recv(), ch_rx.recv()).await { + match select(conn.recv(), rx.recv()).await { Either::Left(msg) => { - // TODO spawn a task - let response = selfc.handle_request(channel.clone(), msg?).await; - debug!("--> {response}"); - conn.send(serde_json::to_value(response)?).await?; - } - Either::Right(msg) => { - let (sub_id, result) = msg?; - let response = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: None, - params: Some(result), - subscription: Some(sub_id.into()), - }; - debug!("--> {response}"); - conn.send(serde_json::to_value(response)?).await?; + selfc.new_request(tx.clone(), channel.clone(), msg?).await; } + Either::Right(msg) => conn.send(msg?).await?, } } }, @@ -176,8 +205,32 @@ impl Server { }) } + /// Spawns a new task for handling a new request + async fn new_request( + self: &Arc<Self>, + sender: async_channel::Sender<serde_json::Value>, + channel: ArcChannel, + msg: serde_json::Value, + ) { + let on_failure = |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Failed to handle a request: {err}"); + } + }; + let selfc = self.clone(); + self.task_group.spawn( + async move { + let response = selfc._handle_request(channel, msg).await; + debug!("--> {response}"); + sender.send(serde_json::json!(response)).await?; + Ok(()) + }, + on_failure, + ); + } + /// Handles a new request - async fn handle_request( + async fn _handle_request( &self, channel: ArcChannel, msg: serde_json::Value, @@ -239,7 +292,6 @@ impl Server { error: None, result: Some(result), id: Some(rpc_msg.id), - subscription: None, } } @@ -262,7 +314,8 @@ impl Server { } }; - let result = match method(channel, rpc_msg.params.clone()).await { + let name = format!("{}.{}", service.name(), method_name); + let result = match method(channel, name, rpc_msg.params.clone()).await { Ok(res) => res, Err(err) => return self.handle_error(err, rpc_msg.id), }; @@ -270,9 +323,8 @@ impl Server { message::Response { jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, - result: None, + result: Some(result), id: Some(rpc_msg.id), - subscription: Some(result), } } |