diff options
Diffstat (limited to 'jsonrpc')
-rw-r--r-- | jsonrpc/examples/pubsub_client.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/client/mod.rs | 26 | ||||
-rw-r--r-- | jsonrpc/src/error.rs | 8 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 97 |
4 files changed, 73 insertions, 60 deletions
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index 823089d..bb3ce4c 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -16,8 +16,6 @@ async fn run_client() { .await .expect("Build a client"); - smol::spawn(async move {}).detach(); - let sub = client .subscribe("Calc.log_subscribe", ()) .await diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 43ce5fc..6f6bd97 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -260,20 +260,26 @@ impl Client { } fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) { - let selfc = self.clone(); - let on_complete = |result: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = result { - error!("Client stopped: {err}"); + let on_complete = { + let this = self.clone(); + |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Client stopped: {err}"); + } + this.disconnect.store(true, Ordering::Relaxed); + this.subscriptions.clear().await; + this.message_dispatcher.clear().await; } - selfc.disconnect.store(true, Ordering::Relaxed); - selfc.subscriptions.clear().await; - selfc.message_dispatcher.clear().await; }; - let selfc = self.clone(); // Spawn a new task - self.task_group - .spawn(selfc.background_loop(conn), on_complete); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.background_loop(conn).await } + }, + on_complete, + ); } async fn background_loop(self: Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> { diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 1b38519..48d0139 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -45,7 +45,7 @@ pub enum Error { ChannelRecv(#[from] async_channel::RecvError), #[error("Channel send Error: {0}")] - ChannelSend(&'static str), + ChannelSend(String), #[error("Unexpected Error: {0}")] General(&'static str), @@ -59,7 +59,7 @@ pub enum Error { impl<T> From<async_channel::SendError<T>> for Error { fn from(error: async_channel::SendError<T>) -> Self { - Error::ChannelSend(error.to_string().leak()) + Error::ChannelSend(error.to_string()) } } @@ -78,7 +78,7 @@ pub enum RPCError { InvalidRequest(&'static str), #[error("Parse Error: {0}")] - ParseError(&'static str), + ParseError(String), #[error("Internal Error")] InternalError, @@ -86,6 +86,6 @@ pub enum RPCError { impl From<serde_json::Error> for RPCError { fn from(error: serde_json::Error) -> Self { - RPCError::ParseError(error.to_string().leak()) + RPCError::ParseError(error.to_string()) } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 8d5cd2c..9d1f5b7 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -77,19 +77,21 @@ impl Server { } }; - let selfc = self.clone(); // Spawns a new task for each new incoming connection self.task_group.spawn( - async move { - loop { - match selfc.listener.accept().await { - Ok(conn) => { - if let Err(err) = selfc.handle_conn(conn).await { - error!("Handle a new connection: {err}") + { + let this = self.clone(); + async move { + loop { + match this.listener.accept().await { + Ok(conn) => { + if let Err(err) = this.handle_conn(conn).await { + error!("Handle a new connection: {err}") + } + } + Err(err) => { + error!("Accept a new connection: {err}") } - } - Err(err) => { - error!("Accept a new connection: {err}") } } } @@ -126,31 +128,33 @@ impl Server { chan.close(); }; - let conn_cloned = conn.clone(); - let queue_cloned = queue.clone(); // Start listening for new responses in the queue or new notifications self.task_group.spawn( - async move { - loop { - // The select function will prioritize the first future if both futures are ready. - // This gives priority to the responses in the response queue. - match select(queue_cloned.recv(), ch_rx.recv()).await { - Either::Left(res) => { - conn_cloned.send(res).await?; - } - Either::Right(notification) => { - let nt = notification?; - let params = Some(serde_json::json!(message::NotificationResult { - subscription: nt.sub_id, - result: Some(nt.result), - })); - let notification = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: nt.method, - params, - }; - debug!("--> {notification}"); - conn_cloned.send(serde_json::json!(notification)).await?; + { + let conn = conn.clone(); + let queue = queue.clone(); + async move { + loop { + // The select function will prioritize the first future if both futures are ready. + // This gives priority to the responses in the response queue. + match select(queue.recv(), ch_rx.recv()).await { + Either::Left(res) => { + conn.send(res).await?; + } + Either::Right(notification) => { + let nt = notification?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let notification = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + debug!("--> {notification}"); + conn.send(serde_json::json!(notification)).await?; + } } } } @@ -169,13 +173,15 @@ impl Server { chan.close(); }; - let selfc = self.clone(); // Spawn a new task and wait for new requests. self.task_group.spawn( - async move { - loop { - let msg = conn.recv().await?; - selfc.new_request(queue.clone(), channel.clone(), msg).await; + { + let this = self.clone(); + async move { + loop { + let msg = conn.recv().await?; + this.new_request(queue.clone(), channel.clone(), msg).await; + } } }, on_complete, @@ -254,15 +260,18 @@ impl Server { error!("Handle a new request: {err}"); } }; - let selfc = self.clone(); + // Spawns a new task for handling the new request, and push the // response to the response queue. self.task_group.spawn( - async move { - let response = selfc.handle_request(channel, msg).await; - debug!("--> {response}"); - queue.push(serde_json::json!(response)).await; - Ok(()) + { + let this = self.clone(); + async move { + let response = this.handle_request(channel, msg).await; + debug!("--> {response}"); + queue.push(serde_json::json!(response)).await; + Ok(()) + } }, on_complete, ); |