aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/client/mod.rs')
-rw-r--r--jsonrpc/src/client/mod.rs68
1 files changed, 38 insertions, 30 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 9c07509..eddba19 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -1,11 +1,12 @@
pub mod builder;
mod message_dispatcher;
-mod subscriber;
+mod subscriptions;
+
+use std::{sync::Arc, time::Duration};
use log::{debug, error};
-use serde::{de::DeserializeOwned, Serialize};
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
-use std::{sync::Arc, time::Duration};
use karyon_core::{
async_util::{timeout, TaskGroup, TaskResult},
@@ -19,8 +20,8 @@ use crate::{
};
use message_dispatcher::MessageDispatcher;
-use subscriber::Subscriber;
-pub use subscriber::Subscription;
+pub use subscriptions::Subscription;
+use subscriptions::Subscriptions;
type RequestID = u32;
@@ -30,7 +31,14 @@ pub struct Client {
timeout: Option<u64>,
message_dispatcher: MessageDispatcher,
task_group: TaskGroup,
- subscriber: Subscriber,
+ subscriptions: Arc<Subscriptions>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(untagged)]
+enum NewMsg {
+ Notification(message::Notification),
+ Response(message::Response),
}
impl Client {
@@ -65,9 +73,9 @@ impl Client {
None => return Err(Error::InvalidMsg("Invalid subscription id")),
};
- let rx = self.subscriber.subscribe(sub_id).await;
+ let sub = self.subscriptions.subscribe(sub_id).await;
- Ok((sub_id, rx))
+ Ok((sub_id, sub))
}
/// Unsubscribes from the provided method, waits for the response, and returns the result.
@@ -76,7 +84,7 @@ impl Client {
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
let _ = self.send_request(method, sub_id).await?;
- self.subscriber.unsubscribe(&sub_id).await;
+ self.subscriptions.unsubscribe(&sub_id).await;
Ok(())
}
@@ -134,11 +142,12 @@ impl Client {
let selfc = self.clone();
let on_complete = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
- error!("background receiving stopped: {err}");
+ error!("Background receiving loop stopped: {err}");
}
// Drop all subscription
- selfc.subscriber.drop_all().await;
+ selfc.subscriptions.drop_all().await;
};
+
let selfc = self.clone();
// Spawn a new task for listing to new coming messages.
self.task_group.spawn(
@@ -146,10 +155,8 @@ impl Client {
loop {
let msg = selfc.conn.recv().await?;
if let Err(err) = selfc.handle_msg(msg).await {
- error!(
- "Handle a msg from the endpoint {} : {err}",
- selfc.conn.peer_endpoint()?
- );
+ let endpoint = selfc.conn.peer_endpoint()?;
+ error!("Handle a new msg from the endpoint {endpoint} : {err}",);
}
}
},
@@ -158,21 +165,22 @@ impl Client {
}
async fn handle_msg(&self, msg: serde_json::Value) -> Result<()> {
- // Check if the received message is of type Response
- if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
- debug!("<-- {res}");
- self.message_dispatcher.dispatch(res).await?;
- return Ok(());
- }
-
- // Check if the received message is of type Notification
- if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
- debug!("<-- {nt}");
- self.subscriber.notify(nt).await?;
- return Ok(());
+ match serde_json::from_value::<NewMsg>(msg.clone()) {
+ Ok(msg) => match msg {
+ NewMsg::Response(res) => {
+ debug!("<-- {res}");
+ self.message_dispatcher.dispatch(res).await
+ }
+ NewMsg::Notification(nt) => {
+ debug!("<-- {nt}");
+ self.subscriptions.notify(nt).await?;
+ Ok(())
+ }
+ },
+ Err(_) => {
+ error!("Receive unexpected msg: {msg}");
+ Err(Error::InvalidMsg("Unexpected msg"))
+ }
}
-
- error!("Receive unexpected msg: {msg}");
- Err(Error::InvalidMsg("Unexpected msg"))
}
}