aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
-rw-r--r--jsonrpc/src/server/mod.rs124
1 files changed, 88 insertions, 36 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 4ebab10..29b1a10 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -47,7 +47,6 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message:
error: Some(err),
result: None,
id,
- subscription: None,
}
}
@@ -77,19 +76,31 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: Arc<Self>) -> Result<()> {
- loop {
- match self.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = self.handle_conn(conn).await {
- error!("Failed to handle a new conn: {err}")
+ pub async fn start(self: &Arc<Self>) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Accept loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ match selfc.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = selfc.handle_conn(conn).await {
+ error!("Failed to handle a new conn: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Failed to accept a new conn: {err}")
+ }
}
}
- Err(err) => {
- error!("Failed to accept a new conn: {err}")
- }
- }
- }
+ },
+ on_failure,
+ );
}
/// Shuts down the RPC server
@@ -102,6 +113,40 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
+ // TODO Avoid depending on channels
+ let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP);
+
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let channel = Channel::new(ch_tx);
+
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ debug!("Notification loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ let txc = tx.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let nt = ch_rx.recv().await?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let response = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {response}");
+ txc.send(serde_json::to_value(response)?).await?;
+ }
+ },
+ on_failure,
+ );
+
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
@@ -110,30 +155,14 @@ impl Server {
}
};
- let selfc = self.clone();
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
- let channel = Channel::new(ch_tx);
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), ch_rx.recv()).await {
+ match select(conn.recv(), rx.recv()).await {
Either::Left(msg) => {
- // TODO spawn a task
- let response = selfc.handle_request(channel.clone(), msg?).await;
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
- }
- Either::Right(msg) => {
- let (sub_id, result) = msg?;
- let response = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: None,
- params: Some(result),
- subscription: Some(sub_id.into()),
- };
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
+ selfc.new_request(tx.clone(), channel.clone(), msg?).await;
}
+ Either::Right(msg) => conn.send(msg?).await?,
}
}
},
@@ -176,8 +205,32 @@ impl Server {
})
}
+ /// Spawns a new task for handling a new request
+ async fn new_request(
+ self: &Arc<Self>,
+ sender: async_channel::Sender<serde_json::Value>,
+ channel: ArcChannel,
+ msg: serde_json::Value,
+ ) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Failed to handle a request: {err}");
+ }
+ };
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ let response = selfc._handle_request(channel, msg).await;
+ debug!("--> {response}");
+ sender.send(serde_json::json!(response)).await?;
+ Ok(())
+ },
+ on_failure,
+ );
+ }
+
/// Handles a new request
- async fn handle_request(
+ async fn _handle_request(
&self,
channel: ArcChannel,
msg: serde_json::Value,
@@ -239,7 +292,6 @@ impl Server {
error: None,
result: Some(result),
id: Some(rpc_msg.id),
- subscription: None,
}
}
@@ -262,7 +314,8 @@ impl Server {
}
};
- let result = match method(channel, rpc_msg.params.clone()).await {
+ let name = format!("{}.{}", service.name(), method_name);
+ let result = match method(channel, name, rpc_msg.params.clone()).await {
Ok(res) => res,
Err(err) => return self.handle_error(err, rpc_msg.id),
};
@@ -270,9 +323,8 @@ impl Server {
message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
- result: None,
+ result: Some(result),
id: Some(rpc_msg.id),
- subscription: Some(result),
}
}