aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 00:19:58 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 00:19:58 +0200
commit2d1a8aea0b9330cd2eaad26eb187644adad6bed9 (patch)
tree6083adaa09ae2f0ef2912f7934cdf0bfafff1654 /jsonrpc/src
parentcc1d61c401e52ba3b6cd264c5400fb7ab52522dc (diff)
jsonrpc: spawn task when handle new request
Diffstat (limited to 'jsonrpc/src')
-rw-r--r--jsonrpc/src/client/mod.rs45
-rw-r--r--jsonrpc/src/codec.rs14
-rw-r--r--jsonrpc/src/error.rs3
-rw-r--r--jsonrpc/src/message.rs29
-rw-r--r--jsonrpc/src/server/channel.rs39
-rw-r--r--jsonrpc/src/server/mod.rs124
-rw-r--r--jsonrpc/src/server/pubsub_service.rs6
7 files changed, 181 insertions, 79 deletions
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index c9253fc..0d8ccb8 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -12,7 +12,7 @@ use tokio_rustls::rustls;
use karyon_core::{
async_runtime::lock::Mutex,
async_util::{timeout, TaskGroup, TaskResult},
- util::random_32,
+ util::random_64,
};
use karyon_net::{tls::ClientTlsConfig, Conn, Endpoint, ToEndpoint};
@@ -54,7 +54,10 @@ impl Client {
let request = self.send_request(method, params, None).await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -84,7 +87,10 @@ impl Client {
let request = self.send_request(method, params, Some(json!(true))).await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -95,7 +101,7 @@ impl Client {
return Err(Error::InvalidMsg("Invalid response id"));
}
- let sub_id = match response.subscription {
+ let sub_id = match response.result {
Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
None => return Err(Error::InvalidMsg("Invalid subscription id")),
};
@@ -116,7 +122,10 @@ impl Client {
.await?;
debug!("--> {request}");
- let response = self.chan_rx.recv().await?;
+ let response = match self.timeout {
+ Some(t) => timeout(Duration::from_millis(t), self.chan_rx.recv()).await??,
+ None => self.chan_rx.recv().await?,
+ };
debug!("<-- {response}");
if let Some(error) = response.error {
@@ -137,11 +146,11 @@ impl Client {
params: T,
subscriber: Option<serde_json::Value>,
) -> Result<message::Request> {
- let id = json!(random_32());
+ let id = random_64();
let request = message::Request {
jsonrpc: message::JSONRPC_VERSION.to_string(),
- id,
+ id: json!(id),
method: method.to_string(),
params: json!(params),
subscriber,
@@ -150,9 +159,9 @@ impl Client {
let req_json = serde_json::to_value(&request)?;
match self.timeout {
- Some(s) => {
- let dur = Duration::from_millis(s);
- timeout(dur, self.conn.send(req_json)).await??;
+ Some(ms) => {
+ let t = Duration::from_millis(ms);
+ timeout(t, self.conn.send(req_json)).await??;
}
None => {
self.conn.send(req_json).await?;
@@ -176,15 +185,14 @@ impl Client {
async move {
loop {
let msg = selfc.conn.recv().await?;
-
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
selfc.chan_tx.send(res).await?;
continue;
}
if let Ok(nt) = serde_json::from_value::<message::Notification>(msg.clone()) {
- let sub_id = match nt.subscription.clone() {
- Some(id) => serde_json::from_value::<SubscriptionID>(id)?,
+ let sub_result: message::NotificationResult = match nt.params {
+ Some(p) => serde_json::from_value(p)?,
None => {
return Err(Error::InvalidMsg(
"Invalid notification msg: subscription id not found",
@@ -192,13 +200,18 @@ impl Client {
}
};
- match selfc.subscriptions.lock().await.get(&sub_id) {
+ match selfc
+ .subscriptions
+ .lock()
+ .await
+ .get(&sub_result.subscription)
+ {
Some(s) => {
- s.send(nt.params.unwrap_or(json!(""))).await?;
+ s.send(sub_result.result.unwrap_or(json!(""))).await?;
continue;
}
None => {
- warn!("Receive unknown notification {sub_id}");
+ warn!("Receive unknown notification {}", sub_result.subscription);
continue;
}
}
diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs
index 29c6f13..cc11602 100644
--- a/jsonrpc/src/codec.rs
+++ b/jsonrpc/src/codec.rs
@@ -38,7 +38,7 @@ impl Decoder for JsonCodec {
let item = match iter.next() {
Some(Ok(item)) => item,
Some(Err(ref e)) if e.is_eof() => return Ok(None),
- Some(Err(e)) => return Err(Error::Encode(e.to_string())),
+ Some(Err(e)) => return Err(Error::Decode(e.to_string())),
None => return Ok(None),
};
@@ -70,13 +70,21 @@ impl WebSocketEncoder for WsJsonCodec {
#[cfg(feature = "ws")]
impl WebSocketDecoder for WsJsonCodec {
type DeItem = serde_json::Value;
- fn decode(&self, src: &Message) -> Result<Self::DeItem> {
+ fn decode(&self, src: &Message) -> Result<Option<Self::DeItem>> {
match src {
Message::Text(s) => match serde_json::from_str(s) {
+ Ok(m) => Ok(Some(m)),
+ Err(err) => Err(Error::Decode(err.to_string())),
+ },
+ Message::Binary(s) => match serde_json::from_slice(s) {
Ok(m) => Ok(m),
Err(err) => Err(Error::Decode(err.to_string())),
},
- _ => Err(Error::Decode("Receive wrong message".to_string())),
+ Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
+ m => Err(Error::Decode(format!(
+ "Receive unexpected message: {:?}",
+ m
+ ))),
}
}
}
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index d68e169..e1cb071 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -32,6 +32,9 @@ pub enum Error {
#[error("Unsupported protocol: {0}")]
UnsupportedProtocol(String),
+ #[error("Receive close message from connection: {0}")]
+ CloseConnection(String),
+
#[error("Subscription not found: {0}")]
SubscriptionNotFound(String),
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 9c89362..2cf28b1 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -1,5 +1,9 @@
use serde::{Deserialize, Serialize};
+use crate::SubscriptionID;
+
+pub type ID = u64;
+
pub const JSONRPC_VERSION: &str = "2.0";
/// Parse error: Invalid JSON was received by the server.
@@ -32,24 +36,25 @@ pub struct Request {
pub struct Response {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
+ pub id: Option<serde_json::Value>,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<Error>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub id: Option<serde_json::Value>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub subscription: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Notification {
pub jsonrpc: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub method: Option<String>,
+ pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub subscription: Option<serde_json::Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct NotificationResult {
+ pub result: Option<serde_json::Value>,
+ pub subscription: SubscriptionID,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -74,8 +79,8 @@ impl std::fmt::Display for Response {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{jsonrpc: {}, result': {:?}, error: {:?} , id: {:?}, subscription: {:?}}}",
- self.jsonrpc, self.result, self.error, self.id, self.subscription
+ "{{jsonrpc: {}, result': {:?}, error: {:?} , id: {:?}}}",
+ self.jsonrpc, self.result, self.error, self.id,
)
}
}
@@ -94,8 +99,8 @@ impl std::fmt::Display for Notification {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{jsonrpc: {}, method: {:?}, params: {:?}, subscription: {:?}}}",
- self.jsonrpc, self.method, self.params, self.subscription
+ "{{jsonrpc: {}, method: {:?}, params: {:?}}}",
+ self.jsonrpc, self.method, self.params
)
}
}
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 1498825..f14c1dd 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -7,11 +7,18 @@ use crate::{Error, Result};
pub type SubscriptionID = u32;
pub type ArcChannel = Arc<Channel>;
+pub(crate) struct NewNotification {
+ pub sub_id: SubscriptionID,
+ pub result: serde_json::Value,
+ pub method: String,
+}
+
/// Represents a new subscription
pub struct Subscription {
pub id: SubscriptionID,
parent: Arc<Channel>,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: String,
}
impl Subscription {
@@ -19,15 +26,26 @@ impl Subscription {
fn new(
parent: Arc<Channel>,
id: SubscriptionID,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: &str,
) -> Self {
- Self { parent, id, chan }
+ Self {
+ parent,
+ id,
+ chan,
+ method: method.to_string(),
+ }
}
/// Sends a notification to the subscriber
pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
if self.parent.subs.lock().await.contains(&self.id) {
- self.chan.send((self.id, res)).await?;
+ let nt = NewNotification {
+ sub_id: self.id,
+ result: res,
+ method: self.method.clone(),
+ };
+ self.chan.send(nt).await?;
Ok(())
} else {
Err(Error::SubscriptionNotFound(self.id.to_string()))
@@ -37,13 +55,13 @@ impl Subscription {
/// Represents a channel for creating/removing subscriptions
pub struct Channel {
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
subs: Mutex<Vec<SubscriptionID>>,
}
impl Channel {
/// Creates a new `Channel`
- pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel {
+ pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
Arc::new(Self {
chan,
subs: Mutex::new(Vec::new()),
@@ -51,19 +69,20 @@ impl Channel {
}
/// Creates a new subscription
- pub async fn new_subscription(self: &Arc<Self>) -> Subscription {
+ pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {
let sub_id = random_32();
- let sub = Subscription::new(self.clone(), sub_id, self.chan.clone());
+ let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method);
self.subs.lock().await.push(sub_id);
sub
}
/// Removes a subscription
pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) {
- let i = match self.subs.lock().await.iter().position(|i| i == id) {
+ let mut subs = self.subs.lock().await;
+ let i = match subs.iter().position(|i| i == id) {
Some(i) => i,
None => return,
};
- self.subs.lock().await.remove(i);
+ subs.remove(i);
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 4ebab10..29b1a10 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -47,7 +47,6 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message:
error: Some(err),
result: None,
id,
- subscription: None,
}
}
@@ -77,19 +76,31 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: Arc<Self>) -> Result<()> {
- loop {
- match self.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = self.handle_conn(conn).await {
- error!("Failed to handle a new conn: {err}")
+ pub async fn start(self: &Arc<Self>) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Accept loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ match selfc.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = selfc.handle_conn(conn).await {
+ error!("Failed to handle a new conn: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Failed to accept a new conn: {err}")
+ }
}
}
- Err(err) => {
- error!("Failed to accept a new conn: {err}")
- }
- }
- }
+ },
+ on_failure,
+ );
}
/// Shuts down the RPC server
@@ -102,6 +113,40 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
+ // TODO Avoid depending on channels
+ let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP);
+
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let channel = Channel::new(ch_tx);
+
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ debug!("Notification loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ let txc = tx.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let nt = ch_rx.recv().await?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let response = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {response}");
+ txc.send(serde_json::to_value(response)?).await?;
+ }
+ },
+ on_failure,
+ );
+
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
@@ -110,30 +155,14 @@ impl Server {
}
};
- let selfc = self.clone();
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
- let channel = Channel::new(ch_tx);
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), ch_rx.recv()).await {
+ match select(conn.recv(), rx.recv()).await {
Either::Left(msg) => {
- // TODO spawn a task
- let response = selfc.handle_request(channel.clone(), msg?).await;
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
- }
- Either::Right(msg) => {
- let (sub_id, result) = msg?;
- let response = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: None,
- params: Some(result),
- subscription: Some(sub_id.into()),
- };
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
+ selfc.new_request(tx.clone(), channel.clone(), msg?).await;
}
+ Either::Right(msg) => conn.send(msg?).await?,
}
}
},
@@ -176,8 +205,32 @@ impl Server {
})
}
+ /// Spawns a new task for handling a new request
+ async fn new_request(
+ self: &Arc<Self>,
+ sender: async_channel::Sender<serde_json::Value>,
+ channel: ArcChannel,
+ msg: serde_json::Value,
+ ) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Failed to handle a request: {err}");
+ }
+ };
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ let response = selfc._handle_request(channel, msg).await;
+ debug!("--> {response}");
+ sender.send(serde_json::json!(response)).await?;
+ Ok(())
+ },
+ on_failure,
+ );
+ }
+
/// Handles a new request
- async fn handle_request(
+ async fn _handle_request(
&self,
channel: ArcChannel,
msg: serde_json::Value,
@@ -239,7 +292,6 @@ impl Server {
error: None,
result: Some(result),
id: Some(rpc_msg.id),
- subscription: None,
}
}
@@ -262,7 +314,8 @@ impl Server {
}
};
- let result = match method(channel, rpc_msg.params.clone()).await {
+ let name = format!("{}.{}", service.name(), method_name);
+ let result = match method(channel, name, rpc_msg.params.clone()).await {
Ok(res) => res,
Err(err) => return self.handle_error(err, rpc_msg.id),
};
@@ -270,9 +323,8 @@ impl Server {
message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
- result: None,
+ result: Some(result),
id: Some(rpc_msg.id),
- subscription: Some(result),
}
}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 5b4bf9a..5b3b50b 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -6,7 +6,7 @@ use super::channel::ArcChannel;
/// Represents the RPC method
pub type PubSubRPCMethod<'a> =
- Box<dyn Fn(ArcChannel, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
+ Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
type PubSubRPCMethodOutput<'a> =
Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
@@ -51,7 +51,9 @@ macro_rules! impl_pubsub_rpc_service {
match name {
$(
stringify!($m) => {
- Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, params: serde_json::Value| Box::pin(self.$m(chan, params))))
+ Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| {
+ Box::pin(self.$m(chan, method, params))
+ }))
}
)*
_ => None,