aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 01:27:45 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 01:27:45 +0200
commitd51f212628f4996d754745b4904a1994ba39a2d0 (patch)
treed7bae571e7d3fdb9715897feae0c1500fb0ee78f
parent7be7f59d5caf38ca0cd7a12a937a2cfdca0268d7 (diff)
jsonrpc: remove subscriber field from Request & clean up
-rw-r--r--jsonrpc/src/client/mod.rs11
-rw-r--r--jsonrpc/src/message.rs6
-rw-r--r--jsonrpc/src/server/mod.rs111
3 files changed, 36 insertions, 92 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 0d8ccb8..5e3a24f 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -51,7 +51,7 @@ impl Client {
method: &str,
params: T,
) -> Result<V> {
- let request = self.send_request(method, params, None).await?;
+ let request = self.send_request(method, params).await?;
debug!("--> {request}");
let response = match self.timeout {
@@ -84,7 +84,7 @@ impl Client {
method: &str,
params: T,
) -> Result<(SubscriptionID, Subscription)> {
- let request = self.send_request(method, params, Some(json!(true))).await?;
+ let request = self.send_request(method, params).await?;
debug!("--> {request}");
let response = match self.timeout {
@@ -117,9 +117,7 @@ impl Client {
/// This function sends an unsubscription request for the specified method
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
- let request = self
- .send_request(method, json!(sub_id), Some(json!(true)))
- .await?;
+ let request = self.send_request(method, json!(sub_id)).await?;
debug!("--> {request}");
let response = match self.timeout {
@@ -144,7 +142,6 @@ impl Client {
&self,
method: &str,
params: T,
- subscriber: Option<serde_json::Value>,
) -> Result<message::Request> {
let id = random_64();
@@ -153,7 +150,6 @@ impl Client {
id: json!(id),
method: method.to_string(),
params: json!(params),
- subscriber,
};
let req_json = serde_json::to_value(&request)?;
@@ -187,6 +183,7 @@ impl Client {
let msg = selfc.conn.recv().await?;
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
selfc.chan_tx.send(res).await?;
+
continue;
}
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 2cf28b1..55f8314 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -27,8 +27,6 @@ pub struct Request {
pub method: String,
pub params: serde_json::Value,
pub id: serde_json::Value,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub subscriber: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -69,8 +67,8 @@ impl std::fmt::Display for Request {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}, subscribe: {:?}}}",
- self.jsonrpc, self.method, self.params, self.id, self.subscriber
+ "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}}}",
+ self.jsonrpc, self.method, self.params, self.id,
)
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 29b1a10..7e9e969 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -220,7 +220,7 @@ impl Server {
let selfc = self.clone();
self.task_group.spawn(
async move {
- let response = selfc._handle_request(channel, msg).await;
+ let response = selfc.handle_request(channel, msg).await;
debug!("--> {response}");
sender.send(serde_json::json!(response)).await?;
Ok(())
@@ -230,7 +230,7 @@ impl Server {
}
/// Handles a new request
- async fn _handle_request(
+ async fn handle_request(
&self,
channel: ArcChannel,
msg: serde_json::Value,
@@ -240,92 +240,41 @@ impl Server {
SanityCheckResult::ErrRes(res) => return res,
};
- if req.msg.subscriber.is_some() {
- match self.pubsub_services.get(&req.srvc_name) {
- Some(s) => {
- self.handle_pubsub_request(channel, s, &req.method_name, req.msg)
- .await
- }
- None => pack_err_res(
- message::METHOD_NOT_FOUND_ERROR_CODE,
- METHOD_NOT_FOUND_ERROR_MSG,
- Some(req.msg.id),
- ),
- }
- } else {
- match self.services.get(&req.srvc_name) {
- Some(s) => self.handle_call_request(s, &req.method_name, req.msg).await,
- None => pack_err_res(
- message::METHOD_NOT_FOUND_ERROR_CODE,
- METHOD_NOT_FOUND_ERROR_MSG,
- Some(req.msg.id),
- ),
- }
- }
- }
-
- /// Handles a call request
- async fn handle_call_request(
- &self,
- service: &Arc<dyn RPCService + 'static>,
- method_name: &str,
- rpc_msg: message::Request,
- ) -> message::Response {
- let method = match service.get_method(method_name) {
- Some(m) => m,
- None => {
- return pack_err_res(
- message::METHOD_NOT_FOUND_ERROR_CODE,
- METHOD_NOT_FOUND_ERROR_MSG,
- Some(rpc_msg.id),
- );
- }
- };
-
- let result = match method(rpc_msg.params.clone()).await {
- Ok(res) => res,
- Err(err) => return self.handle_error(err, rpc_msg.id),
- };
-
- message::Response {
+ let mut response = message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
- result: Some(result),
- id: Some(rpc_msg.id),
- }
- }
+ result: None,
+ id: Some(req.msg.id.clone()),
+ };
- /// Handles a pubsub request
- async fn handle_pubsub_request(
- &self,
- channel: ArcChannel,
- service: &Arc<dyn PubSubRPCService + 'static>,
- method_name: &str,
- rpc_msg: message::Request,
- ) -> message::Response {
- let method = match service.get_pubsub_method(method_name) {
- Some(m) => m,
- None => {
- return pack_err_res(
- message::METHOD_NOT_FOUND_ERROR_CODE,
- METHOD_NOT_FOUND_ERROR_MSG,
- Some(rpc_msg.id),
- );
+ if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
+ if let Some(method) = service.get_pubsub_method(&req.method_name) {
+ let name = format!("{}.{}", service.name(), req.method_name);
+ response.result = match method(channel, name, req.msg.params.clone()).await {
+ Ok(res) => Some(res),
+ Err(err) => return self.handle_error(err, req.msg.id),
+ };
+
+ return response;
}
- };
+ }
- let name = format!("{}.{}", service.name(), method_name);
- let result = match method(channel, name, rpc_msg.params.clone()).await {
- Ok(res) => res,
- Err(err) => return self.handle_error(err, rpc_msg.id),
- };
+ if let Some(service) = self.services.get(&req.srvc_name) {
+ if let Some(method) = service.get_method(&req.method_name) {
+ response.result = match method(req.msg.params.clone()).await {
+ Ok(res) => Some(res),
+ Err(err) => return self.handle_error(err, req.msg.id),
+ };
- message::Response {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- error: None,
- result: Some(result),
- id: Some(rpc_msg.id),
+ return response;
+ }
}
+
+ pack_err_res(
+ message::METHOD_NOT_FOUND_ERROR_CODE,
+ METHOD_NOT_FOUND_ERROR_MSG,
+ Some(req.msg.id),
+ )
}
fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response {