aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r--jsonrpc/src/server/mod.rs97
1 files changed, 53 insertions, 44 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 8d5cd2c..9d1f5b7 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -77,19 +77,21 @@ impl Server {
}
};
- let selfc = self.clone();
// Spawns a new task for each new incoming connection
self.task_group.spawn(
- async move {
- loop {
- match selfc.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = selfc.handle_conn(conn).await {
- error!("Handle a new connection: {err}")
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ match this.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = this.handle_conn(conn).await {
+ error!("Handle a new connection: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Accept a new connection: {err}")
}
- }
- Err(err) => {
- error!("Accept a new connection: {err}")
}
}
}
@@ -126,31 +128,33 @@ impl Server {
chan.close();
};
- let conn_cloned = conn.clone();
- let queue_cloned = queue.clone();
// Start listening for new responses in the queue or new notifications
self.task_group.spawn(
- async move {
- loop {
- // The select function will prioritize the first future if both futures are ready.
- // This gives priority to the responses in the response queue.
- match select(queue_cloned.recv(), ch_rx.recv()).await {
- Either::Left(res) => {
- conn_cloned.send(res).await?;
- }
- Either::Right(notification) => {
- let nt = notification?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- conn_cloned.send(serde_json::json!(notification)).await?;
+ {
+ let conn = conn.clone();
+ let queue = queue.clone();
+ async move {
+ loop {
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {notification}");
+ conn.send(serde_json::json!(notification)).await?;
+ }
}
}
}
@@ -169,13 +173,15 @@ impl Server {
chan.close();
};
- let selfc = self.clone();
// Spawn a new task and wait for new requests.
self.task_group.spawn(
- async move {
- loop {
- let msg = conn.recv().await?;
- selfc.new_request(queue.clone(), channel.clone(), msg).await;
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ let msg = conn.recv().await?;
+ this.new_request(queue.clone(), channel.clone(), msg).await;
+ }
}
},
on_complete,
@@ -254,15 +260,18 @@ impl Server {
error!("Handle a new request: {err}");
}
};
- let selfc = self.clone();
+
// Spawns a new task for handling the new request, and push the
// response to the response queue.
self.task_group.spawn(
- async move {
- let response = selfc.handle_request(channel, msg).await;
- debug!("--> {response}");
- queue.push(serde_json::json!(response)).await;
- Ok(())
+ {
+ let this = self.clone();
+ async move {
+ let response = this.handle_request(channel, msg).await;
+ debug!("--> {response}");
+ queue.push(serde_json::json!(response)).await;
+ Ok(())
+ }
},
on_complete,
);