diff options
Diffstat (limited to 'jsonrpc/src')
| -rw-r--r-- | jsonrpc/src/client/mod.rs | 26 | ||||
| -rw-r--r-- | jsonrpc/src/error.rs | 8 | ||||
| -rw-r--r-- | jsonrpc/src/server/mod.rs | 97 | 
3 files changed, 73 insertions, 58 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 43ce5fc..6f6bd97 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -260,20 +260,26 @@ impl Client {      }      fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) { -        let selfc = self.clone(); -        let on_complete = |result: TaskResult<Result<()>>| async move { -            if let TaskResult::Completed(Err(err)) = result { -                error!("Client stopped: {err}"); +        let on_complete = { +            let this = self.clone(); +            |result: TaskResult<Result<()>>| async move { +                if let TaskResult::Completed(Err(err)) = result { +                    error!("Client stopped: {err}"); +                } +                this.disconnect.store(true, Ordering::Relaxed); +                this.subscriptions.clear().await; +                this.message_dispatcher.clear().await;              } -            selfc.disconnect.store(true, Ordering::Relaxed); -            selfc.subscriptions.clear().await; -            selfc.message_dispatcher.clear().await;          }; -        let selfc = self.clone();          // Spawn a new task -        self.task_group -            .spawn(selfc.background_loop(conn), on_complete); +        self.task_group.spawn( +            { +                let this = self.clone(); +                async move { this.background_loop(conn).await } +            }, +            on_complete, +        );      }      async fn background_loop(self: Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> { diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 1b38519..48d0139 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -45,7 +45,7 @@ pub enum Error {      ChannelRecv(#[from] async_channel::RecvError),      #[error("Channel send  Error: {0}")] -    ChannelSend(&'static str), +    ChannelSend(String),      #[error("Unexpected Error: {0}")]      General(&'static str), @@ -59,7 +59,7 @@ pub enum Error {  impl<T> From<async_channel::SendError<T>> for Error {      fn from(error: async_channel::SendError<T>) -> Self { -        Error::ChannelSend(error.to_string().leak()) +        Error::ChannelSend(error.to_string())      }  } @@ -78,7 +78,7 @@ pub enum RPCError {      InvalidRequest(&'static str),      #[error("Parse Error: {0}")] -    ParseError(&'static str), +    ParseError(String),      #[error("Internal Error")]      InternalError, @@ -86,6 +86,6 @@ pub enum RPCError {  impl From<serde_json::Error> for RPCError {      fn from(error: serde_json::Error) -> Self { -        RPCError::ParseError(error.to_string().leak()) +        RPCError::ParseError(error.to_string())      }  } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 8d5cd2c..9d1f5b7 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -77,19 +77,21 @@ impl Server {              }          }; -        let selfc = self.clone();          // Spawns a new task for each new incoming connection          self.task_group.spawn( -            async move { -                loop { -                    match selfc.listener.accept().await { -                        Ok(conn) => { -                            if let Err(err) = selfc.handle_conn(conn).await { -                                error!("Handle a new connection: {err}") +            { +                let this = self.clone(); +                async move { +                    loop { +                        match this.listener.accept().await { +                            Ok(conn) => { +                                if let Err(err) = this.handle_conn(conn).await { +                                    error!("Handle a new connection: {err}") +                                } +                            } +                            Err(err) => { +                                error!("Accept a new connection: {err}")                              } -                        } -                        Err(err) => { -                            error!("Accept a new connection: {err}")                          }                      }                  } @@ -126,31 +128,33 @@ impl Server {              chan.close();          }; -        let conn_cloned = conn.clone(); -        let queue_cloned = queue.clone();          // Start listening for new responses in the queue or new notifications          self.task_group.spawn( -            async move { -                loop { -                    // The select function will prioritize the first future if both futures are ready. -                    // This gives priority to the responses in the response queue. -                    match select(queue_cloned.recv(), ch_rx.recv()).await { -                        Either::Left(res) => { -                            conn_cloned.send(res).await?; -                        } -                        Either::Right(notification) => { -                            let nt = notification?; -                            let params = Some(serde_json::json!(message::NotificationResult { -                                subscription: nt.sub_id, -                                result: Some(nt.result), -                            })); -                            let notification = message::Notification { -                                jsonrpc: message::JSONRPC_VERSION.to_string(), -                                method: nt.method, -                                params, -                            }; -                            debug!("--> {notification}"); -                            conn_cloned.send(serde_json::json!(notification)).await?; +            { +                let conn = conn.clone(); +                let queue = queue.clone(); +                async move { +                    loop { +                        // The select function will prioritize the first future if both futures are ready. +                        // This gives priority to the responses in the response queue. +                        match select(queue.recv(), ch_rx.recv()).await { +                            Either::Left(res) => { +                                conn.send(res).await?; +                            } +                            Either::Right(notification) => { +                                let nt = notification?; +                                let params = Some(serde_json::json!(message::NotificationResult { +                                    subscription: nt.sub_id, +                                    result: Some(nt.result), +                                })); +                                let notification = message::Notification { +                                    jsonrpc: message::JSONRPC_VERSION.to_string(), +                                    method: nt.method, +                                    params, +                                }; +                                debug!("--> {notification}"); +                                conn.send(serde_json::json!(notification)).await?; +                            }                          }                      }                  } @@ -169,13 +173,15 @@ impl Server {              chan.close();          }; -        let selfc = self.clone();          // Spawn a new task and wait for new requests.          self.task_group.spawn( -            async move { -                loop { -                    let msg = conn.recv().await?; -                    selfc.new_request(queue.clone(), channel.clone(), msg).await; +            { +                let this = self.clone(); +                async move { +                    loop { +                        let msg = conn.recv().await?; +                        this.new_request(queue.clone(), channel.clone(), msg).await; +                    }                  }              },              on_complete, @@ -254,15 +260,18 @@ impl Server {                  error!("Handle a new request: {err}");              }          }; -        let selfc = self.clone(); +          // Spawns a new task for handling the new request, and push the          // response to the response queue.          self.task_group.spawn( -            async move { -                let response = selfc.handle_request(channel, msg).await; -                debug!("--> {response}"); -                queue.push(serde_json::json!(response)).await; -                Ok(()) +            { +                let this = self.clone(); +                async move { +                    let response = this.handle_request(channel, msg).await; +                    debug!("--> {response}"); +                    queue.push(serde_json::json!(response)).await; +                    Ok(()) +                }              },              on_complete,          );  | 
