aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 00:19:58 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 00:19:58 +0200
commit2d1a8aea0b9330cd2eaad26eb187644adad6bed9 (patch)
tree6083adaa09ae2f0ef2912f7934cdf0bfafff1654 /jsonrpc/src/client/mod.rs
parentcc1d61c401e52ba3b6cd264c5400fb7ab52522dc (diff)
jsonrpc: spawn task when handle new request
Diffstat (limited to 'jsonrpc/src/client/mod.rs')
-rw-r--r--jsonrpc/src/client/mod.rs45
1 files changed, 29 insertions, 16 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index c9253fc..0d8ccb8 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -12,7 +12,7 @@ use tokio_rustls::rustls;
use karyon_core::{
async_runtime::lock::Mutex,
async_util::{timeout, TaskGroup, TaskResult},
- util::random_32,
+ util::random_64,
};
use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
@@ -54,7 +54,10 @@ impl Client {
let request = self.send_request(method, params, None).await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -84,7 +87,10 @@ impl Client {
let request = self.send_request(method, params, Some(json!(true))).await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -95,7 +101,7 @@ impl Client {
return Err(Error::InvalidMsg("Invalid response id"));
}
- let sub_id = match response.subscription {
+ let sub_id = match response.result {
Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
None => return Err(Error::InvalidMsg("Invalid subscription id")),
};
@@ -116,7 +122,10 @@ impl Client {
.await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -137,11 +146,11 @@ impl Client {
params: T,
subscriber: Option<serde_json::Value>,
) -> Result<message::Request> {
- let id = json!(random_32());
+ let id = random_64();
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
- id,
+ id: json!(id),
method: method.to_string(),
params: json!(params),
subscriber,
@@ -150,9 +159,9 @@ impl Client {
let req_json = serde_json::to_value(&request)?;
match self.timeout {
- Some(s) => {
- let dur = Duration::from_millis(s);
- timeout(dur, self.conn.send(req_json)).await??;
+ Some(ms) => {
+ let t = Duration::from_millis(ms);
+ timeout(t, self.conn.send(req_json)).await??;
}
None => {
self.conn.send(req_json).await?;
@@ -176,15 +185,14 @@ impl Client {
async move {
loop {
let msg = selfc.conn.recv().await?;
-
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
selfc.chan_tx.send(res).await?;
continue;
}
if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
- let sub_id = match nt.subscription.clone() {
- Some(id) => serde_json::from_value::<SubscriptionID>(id)?,
+ let sub_result: message::NotificationResult = match nt.params {
+ Some(p) => serde_json::from_value(p)?,
None => {
return Err(Error::InvalidMsg(
"Invalid notification msg: subscription id not found",
@@ -192,13 +200,18 @@ impl Client {
}
};
- match selfc.subscriptions.lock().await.get(&sub_id) {
+ match selfc
+ .subscriptions
+ .lock()
+ .await
+ .get(&sub_result.subscription)
+ {
Some(s) => {
- s.send(nt.params.unwrap_or(json!(""))).await?;
+ s.send(sub_result.result.unwrap_or(json!(""))).await?;
continue;
}
None => {
- warn!("Receive unknown notification {sub_id}");
+ warn!("Receive unknown notification {}", sub_result.subscription);
continue;
}
}