From 2d1a8aea0b9330cd2eaad26eb187644adad6bed9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 00:19:58 +0200 Subject: jsonrpc: spawn task when handle new request --- jsonrpc/src/client/mod.rs | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) (limited to 'jsonrpc/src/client') diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index c9253fc..0d8ccb8 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -12,7 +12,7 @@ use tokio_rustls::rustls; use karyon_core::{ async_runtime::lock::Mutex, async_util::{timeout, TaskGroup, TaskResult}, - util::random_32, + util::random_64, }; use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint}; @@ -54,7 +54,10 @@ impl Client { let request = self.send_request(method, params, None).await?; debug!("--> {request}"); - let response = self.chan_rx.recv().await?; + let response = match self.timeout { + Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, + None => self.chan_rx.recv().await?, + }; debug!("<-- {response}"); if let Some(error) = response.error { @@ -84,7 +87,10 @@ impl Client { let request = self.send_request(method, params, Some(json!(true))).await?; debug!("--> {request}"); - let response = self.chan_rx.recv().await?; + let response = match self.timeout { + Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, + None => self.chan_rx.recv().await?, + }; debug!("<-- {response}"); if let Some(error) = response.error { @@ -95,7 +101,7 @@ impl Client { return Err(Error::InvalidMsg("Invalid response id")); } - let sub_id = match response.subscription { + let sub_id = match response.result { Some(result) => serde_json::from_value::(result)?, None => return Err(Error::InvalidMsg("Invalid subscription id")), }; @@ -116,7 +122,10 @@ impl Client { .await?; debug!("--> {request}"); - let response = self.chan_rx.recv().await?; + let response = match self.timeout { + Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??, + None => self.chan_rx.recv().await?, + }; debug!("<-- {response}"); if let Some(error) = response.error { @@ -137,11 +146,11 @@ impl Client { params: T, subscriber: Option, ) -> Result { - let id = json!(random_32()); + let id = random_64(); let request = message::Request { jsonrpc: message::JSONRPC_VERSION.to_string(), - id, + id: json!(id), method: method.to_string(), params: json!(params), subscriber, @@ -150,9 +159,9 @@ impl Client { let req_json = serde_json::to_value(&request)?; match self.timeout { - Some(s) => { - let dur = Duration::from_millis(s); - timeout(dur, self.conn.send(req_json)).await??; + Some(ms) => { + let t = Duration::from_millis(ms); + timeout(t, self.conn.send(req_json)).await??; } None => { self.conn.send(req_json).await?; @@ -176,15 +185,14 @@ impl Client { async move { loop { let msg = selfc.conn.recv().await?; - if let Ok(res) = serde_json::from_value::(msg.clone()) { selfc.chan_tx.send(res).await?; continue; } if let Ok(nt) = serde_json::from_value::(msg.clone()) { - let sub_id = match nt.subscription.clone() { - Some(id) => serde_json::from_value::(id)?, + let sub_result: message::NotificationResult = match nt.params { + Some(p) => serde_json::from_value(p)?, None => { return Err(Error::InvalidMsg( "Invalid notification msg: subscription id not found", @@ -192,13 +200,18 @@ impl Client { } }; - match selfc.subscriptions.lock().await.get(&sub_id) { + match selfc + .subscriptions + .lock() + .await + .get(&sub_result.subscription) + { Some(s) => { - s.send(nt.params.unwrap_or(json!(""))).await?; + s.send(sub_result.result.unwrap_or(json!(""))).await?; continue; } None => { - warn!("Receive unknown notification {sub_id}"); + warn!("Receive unknown notification {}", sub_result.subscription); continue; } } -- cgit v1.2.3