From d51f212628f4996d754745b4904a1994ba39a2d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 01:27:45 +0200 Subject: jsonrpc: remove subscriber field from Request & clean up --- jsonrpc/src/client/mod.rs | 11 ++--- jsonrpc/src/message.rs | 6 +-- jsonrpc/src/server/mod.rs | 111 +++++++++++++--------------------------------- 3 files changed, 36 insertions(+), 92 deletions(-) (limited to 'jsonrpc') diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 0d8ccb8..5e3a24f 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -51,7 +51,7 @@ impl Client { method: &str, params: T, ) -> Result { - let request = self.send_request(method, params, None).await?; + let request = self.send_request(method, params).await?; debug!("--> {request}"); let response = match self.timeout { @@ -84,7 +84,7 @@ impl Client { method: &str, params: T, ) -> Result<(SubscriptionID, Subscription)> { - let request = self.send_request(method, params, Some(json!(true))).await?; + let request = self.send_request(method, params).await?; debug!("--> {request}"); let response = match self.timeout { @@ -117,9 +117,7 @@ impl Client { /// This function sends an unsubscription request for the specified method /// and subscription ID. It waits for the response to confirm the unsubscription. pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> { - let request = self - .send_request(method, json!(sub_id), Some(json!(true))) - .await?; + let request = self.send_request(method, json!(sub_id)).await?; debug!("--> {request}"); let response = match self.timeout { @@ -144,7 +142,6 @@ impl Client { &self, method: &str, params: T, - subscriber: Option, ) -> Result { let id = random_64(); @@ -153,7 +150,6 @@ impl Client { id: json!(id), method: method.to_string(), params: json!(params), - subscriber, }; let req_json = serde_json::to_value(&request)?; @@ -187,6 +183,7 @@ impl Client { let msg = selfc.conn.recv().await?; if let Ok(res) = serde_json::from_value::(msg.clone()) { selfc.chan_tx.send(res).await?; + continue; } diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 2cf28b1..55f8314 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -27,8 +27,6 @@ pub struct Request { pub method: String, pub params: serde_json::Value, pub id: serde_json::Value, - #[serde(skip_serializing_if = "Option::is_none")] - pub subscriber: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -69,8 +67,8 @@ impl std::fmt::Display for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}, subscribe: {:?}}}", - self.jsonrpc, self.method, self.params, self.id, self.subscriber + "{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}}}", + self.jsonrpc, self.method, self.params, self.id, ) } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 29b1a10..7e9e969 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -220,7 +220,7 @@ impl Server { let selfc = self.clone(); self.task_group.spawn( async move { - let response = selfc._handle_request(channel, msg).await; + let response = selfc.handle_request(channel, msg).await; debug!("--> {response}"); sender.send(serde_json::json!(response)).await?; Ok(()) @@ -230,7 +230,7 @@ impl Server { } /// Handles a new request - async fn _handle_request( + async fn handle_request( &self, channel: ArcChannel, msg: serde_json::Value, @@ -240,92 +240,41 @@ impl Server { SanityCheckResult::ErrRes(res) => return res, }; - if req.msg.subscriber.is_some() { - match self.pubsub_services.get(&req.srvc_name) { - Some(s) => { - self.handle_pubsub_request(channel, s, &req.method_name, req.msg) - .await - } - None => pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(req.msg.id), - ), - } - } else { - match self.services.get(&req.srvc_name) { - Some(s) => self.handle_call_request(s, &req.method_name, req.msg).await, - None => pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(req.msg.id), - ), - } - } - } - - /// Handles a call request - async fn handle_call_request( - &self, - service: &Arc, - method_name: &str, - rpc_msg: message::Request, - ) -> message::Response { - let method = match service.get_method(method_name) { - Some(m) => m, - None => { - return pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(rpc_msg.id), - ); - } - }; - - let result = match method(rpc_msg.params.clone()).await { - Ok(res) => res, - Err(err) => return self.handle_error(err, rpc_msg.id), - }; - - message::Response { + let mut response = message::Response { jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, - result: Some(result), - id: Some(rpc_msg.id), - } - } + result: None, + id: Some(req.msg.id.clone()), + }; - /// Handles a pubsub request - async fn handle_pubsub_request( - &self, - channel: ArcChannel, - service: &Arc, - method_name: &str, - rpc_msg: message::Request, - ) -> message::Response { - let method = match service.get_pubsub_method(method_name) { - Some(m) => m, - None => { - return pack_err_res( - message::METHOD_NOT_FOUND_ERROR_CODE, - METHOD_NOT_FOUND_ERROR_MSG, - Some(rpc_msg.id), - ); + if let Some(service) = self.pubsub_services.get(&req.srvc_name) { + if let Some(method) = service.get_pubsub_method(&req.method_name) { + let name = format!("{}.{}", service.name(), req.method_name); + response.result = match method(channel, name, req.msg.params.clone()).await { + Ok(res) => Some(res), + Err(err) => return self.handle_error(err, req.msg.id), + }; + + return response; } - }; + } - let name = format!("{}.{}", service.name(), method_name); - let result = match method(channel, name, rpc_msg.params.clone()).await { - Ok(res) => res, - Err(err) => return self.handle_error(err, rpc_msg.id), - }; + if let Some(service) = self.services.get(&req.srvc_name) { + if let Some(method) = service.get_method(&req.method_name) { + response.result = match method(req.msg.params.clone()).await { + Ok(res) => Some(res), + Err(err) => return self.handle_error(err, req.msg.id), + }; - message::Response { - jsonrpc: message::JSONRPC_VERSION.to_string(), - error: None, - result: Some(result), - id: Some(rpc_msg.id), + return response; + } } + + pack_err_res( + message::METHOD_NOT_FOUND_ERROR_CODE, + METHOD_NOT_FOUND_ERROR_MSG, + Some(req.msg.id), + ) } fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response { -- cgit v1.2.3