aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc')
-rw-r--r--jsonrpc/examples/pubsub_client.rs2
-rw-r--r--jsonrpc/src/client/mod.rs26
-rw-r--r--jsonrpc/src/error.rs8
-rw-r--r--jsonrpc/src/server/mod.rs97
4 files changed, 73 insertions, 60 deletions
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
index 823089d..bb3ce4c 100644
--- a/jsonrpc/examples/pubsub_client.rs
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -16,8 +16,6 @@ async fn run_client() {
.await
.expect("Build a client");
- smol::spawn(async move {}).detach();
-
let sub = client
.subscribe("Calc.log_subscribe", ())
.await
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 43ce5fc..6f6bd97 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -260,20 +260,26 @@ impl Client {
}
fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) {
- let selfc = self.clone();
- let on_complete = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Client stopped: {err}");
+ let on_complete = {
+ let this = self.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Client stopped: {err}");
+ }
+ this.disconnect.store(true, Ordering::Relaxed);
+ this.subscriptions.clear().await;
+ this.message_dispatcher.clear().await;
}
- selfc.disconnect.store(true, Ordering::Relaxed);
- selfc.subscriptions.clear().await;
- selfc.message_dispatcher.clear().await;
};
- let selfc = self.clone();
// Spawn a new task
- self.task_group
- .spawn(selfc.background_loop(conn), on_complete);
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.background_loop(conn).await }
+ },
+ on_complete,
+ );
}
async fn background_loop(self: Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> {
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index 1b38519..48d0139 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -45,7 +45,7 @@ pub enum Error {
ChannelRecv(#[from] async_channel::RecvError),
#[error("Channel send Error: {0}")]
- ChannelSend(&'static str),
+ ChannelSend(String),
#[error("Unexpected Error: {0}")]
General(&'static str),
@@ -59,7 +59,7 @@ pub enum Error {
impl<T> From<async_channel::SendError<T>> for Error {
fn from(error: async_channel::SendError<T>) -> Self {
- Error::ChannelSend(error.to_string().leak())
+ Error::ChannelSend(error.to_string())
}
}
@@ -78,7 +78,7 @@ pub enum RPCError {
InvalidRequest(&'static str),
#[error("Parse Error: {0}")]
- ParseError(&'static str),
+ ParseError(String),
#[error("Internal Error")]
InternalError,
@@ -86,6 +86,6 @@ pub enum RPCError {
impl From<serde_json::Error> for RPCError {
fn from(error: serde_json::Error) -> Self {
- RPCError::ParseError(error.to_string().leak())
+ RPCError::ParseError(error.to_string())
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 8d5cd2c..9d1f5b7 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -77,19 +77,21 @@ impl Server {
}
};
- let selfc = self.clone();
// Spawns a new task for each new incoming connection
self.task_group.spawn(
- async move {
- loop {
- match selfc.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = selfc.handle_conn(conn).await {
- error!("Handle a new connection: {err}")
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ match this.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = this.handle_conn(conn).await {
+ error!("Handle a new connection: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Accept a new connection: {err}")
}
- }
- Err(err) => {
- error!("Accept a new connection: {err}")
}
}
}
@@ -126,31 +128,33 @@ impl Server {
chan.close();
};
- let conn_cloned = conn.clone();
- let queue_cloned = queue.clone();
// Start listening for new responses in the queue or new notifications
self.task_group.spawn(
- async move {
- loop {
- // The select function will prioritize the first future if both futures are ready.
- // This gives priority to the responses in the response queue.
- match select(queue_cloned.recv(), ch_rx.recv()).await {
- Either::Left(res) => {
- conn_cloned.send(res).await?;
- }
- Either::Right(notification) => {
- let nt = notification?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- conn_cloned.send(serde_json::json!(notification)).await?;
+ {
+ let conn = conn.clone();
+ let queue = queue.clone();
+ async move {
+ loop {
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {notification}");
+ conn.send(serde_json::json!(notification)).await?;
+ }
}
}
}
@@ -169,13 +173,15 @@ impl Server {
chan.close();
};
- let selfc = self.clone();
// Spawn a new task and wait for new requests.
self.task_group.spawn(
- async move {
- loop {
- let msg = conn.recv().await?;
- selfc.new_request(queue.clone(), channel.clone(), msg).await;
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ let msg = conn.recv().await?;
+ this.new_request(queue.clone(), channel.clone(), msg).await;
+ }
}
},
on_complete,
@@ -254,15 +260,18 @@ impl Server {
error!("Handle a new request: {err}");
}
};
- let selfc = self.clone();
+
// Spawns a new task for handling the new request, and push the
// response to the response queue.
self.task_group.spawn(
- async move {
- let response = selfc.handle_request(channel, msg).await;
- debug!("--> {response}");
- queue.push(serde_json::json!(response)).await;
- Ok(())
+ {
+ let this = self.clone();
+ async move {
+ let response = this.handle_request(channel, msg).await;
+ debug!("--> {response}");
+ queue.push(serde_json::json!(response)).await;
+ Ok(())
+ }
},
on_complete,
);