aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server')
-rw-r--r--jsonrpc/src/server/channel.rs39
-rw-r--r--jsonrpc/src/server/mod.rs124
-rw-r--r--jsonrpc/src/server/pubsub_service.rs6
3 files changed, 121 insertions, 48 deletions
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 1498825..f14c1dd 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -7,11 +7,18 @@ use crate::{Error, Result};
pub type SubscriptionID = u32;
pub type ArcChannel = Arc<Channel>;
+pub(crate) struct NewNotification {
+ pub sub_id: SubscriptionID,
+ pub result: serde_json::Value,
+ pub method: String,
+}
+
/// Represents a new subscription
pub struct Subscription {
pub id: SubscriptionID,
parent: Arc<Channel>,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: String,
}
impl Subscription {
@@ -19,15 +26,26 @@ impl Subscription {
fn new(
parent: Arc<Channel>,
id: SubscriptionID,
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
+ method: &str,
) -> Self {
- Self { parent, id, chan }
+ Self {
+ parent,
+ id,
+ chan,
+ method: method.to_string(),
+ }
}
/// Sends a notification to the subscriber
pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
if self.parent.subs.lock().await.contains(&self.id) {
- self.chan.send((self.id, res)).await?;
+ let nt = NewNotification {
+ sub_id: self.id,
+ result: res,
+ method: self.method.clone(),
+ };
+ self.chan.send(nt).await?;
Ok(())
} else {
Err(Error::SubscriptionNotFound(self.id.to_string()))
@@ -37,13 +55,13 @@ impl Subscription {
/// Represents a channel for creating/removing subscriptions
pub struct Channel {
- chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>,
+ chan: async_channel::Sender<NewNotification>,
subs: Mutex<Vec<SubscriptionID>>,
}
impl Channel {
/// Creates a new `Channel`
- pub fn new(chan: async_channel::Sender<(SubscriptionID, serde_json::Value)>) -> ArcChannel {
+ pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {
Arc::new(Self {
chan,
subs: Mutex::new(Vec::new()),
@@ -51,19 +69,20 @@ impl Channel {
}
/// Creates a new subscription
- pub async fn new_subscription(self: &Arc<Self>) -> Subscription {
+ pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {
let sub_id = random_32();
- let sub = Subscription::new(self.clone(), sub_id, self.chan.clone());
+ let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method);
self.subs.lock().await.push(sub_id);
sub
}
/// Removes a subscription
pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) {
- let i = match self.subs.lock().await.iter().position(|i| i == id) {
+ let mut subs = self.subs.lock().await;
+ let i = match subs.iter().position(|i| i == id) {
Some(i) => i,
None => return,
};
- self.subs.lock().await.remove(i);
+ subs.remove(i);
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 4ebab10..29b1a10 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -47,7 +47,6 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message:
error: Some(err),
result: None,
id,
- subscription: None,
}
}
@@ -77,19 +76,31 @@ impl Server {
}
/// Starts the RPC server
- pub async fn start(self: Arc<Self>) -> Result<()> {
- loop {
- match self.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = self.handle_conn(conn).await {
- error!("Failed to handle a new conn: {err}")
+ pub async fn start(self: &Arc<Self>) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Accept loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ match selfc.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = selfc.handle_conn(conn).await {
+ error!("Failed to handle a new conn: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Failed to accept a new conn: {err}")
+ }
}
}
- Err(err) => {
- error!("Failed to accept a new conn: {err}")
- }
- }
- }
+ },
+ on_failure,
+ );
}
/// Shuts down the RPC server
@@ -102,6 +113,40 @@ impl Server {
let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
+ // TODO Avoid depending on channels
+ let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP);
+
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
+ let channel = Channel::new(ch_tx);
+
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ debug!("Notification loop stopped: {err}");
+ }
+ };
+
+ let selfc = self.clone();
+ let txc = tx.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let nt = ch_rx.recv().await?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let response = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {response}");
+ txc.send(serde_json::to_value(response)?).await?;
+ }
+ },
+ on_failure,
+ );
+
let on_failure = |result: TaskResult<Result<()>>| async move {
if let TaskResult::Completed(Err(err)) = result {
error!("Connection {} dropped: {}", endpoint, err);
@@ -110,30 +155,14 @@ impl Server {
}
};
- let selfc = self.clone();
- let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);
- let channel = Channel::new(ch_tx);
self.task_group.spawn(
async move {
loop {
- match select(conn.recv(), ch_rx.recv()).await {
+ match select(conn.recv(), rx.recv()).await {
Either::Left(msg) => {
- // TODO spawn a task
- let response = selfc.handle_request(channel.clone(), msg?).await;
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
- }
- Either::Right(msg) => {
- let (sub_id, result) = msg?;
- let response = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: None,
- params: Some(result),
- subscription: Some(sub_id.into()),
- };
- debug!("--> {response}");
- conn.send(serde_json::to_value(response)?).await?;
+ selfc.new_request(tx.clone(), channel.clone(), msg?).await;
}
+ Either::Right(msg) => conn.send(msg?).await?,
}
}
},
@@ -176,8 +205,32 @@ impl Server {
})
}
+ /// Spawns a new task for handling a new request
+ async fn new_request(
+ self: &Arc<Self>,
+ sender: async_channel::Sender<serde_json::Value>,
+ channel: ArcChannel,
+ msg: serde_json::Value,
+ ) {
+ let on_failure = |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Failed to handle a request: {err}");
+ }
+ };
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ let response = selfc._handle_request(channel, msg).await;
+ debug!("--> {response}");
+ sender.send(serde_json::json!(response)).await?;
+ Ok(())
+ },
+ on_failure,
+ );
+ }
+
/// Handles a new request
- async fn handle_request(
+ async fn _handle_request(
&self,
channel: ArcChannel,
msg: serde_json::Value,
@@ -239,7 +292,6 @@ impl Server {
error: None,
result: Some(result),
id: Some(rpc_msg.id),
- subscription: None,
}
}
@@ -262,7 +314,8 @@ impl Server {
}
};
- let result = match method(channel, rpc_msg.params.clone()).await {
+ let name = format!("{}.{}", service.name(), method_name);
+ let result = match method(channel, name, rpc_msg.params.clone()).await {
Ok(res) => res,
Err(err) => return self.handle_error(err, rpc_msg.id),
};
@@ -270,9 +323,8 @@ impl Server {
message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
- result: None,
+ result: Some(result),
id: Some(rpc_msg.id),
- subscription: Some(result),
}
}
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 5b4bf9a..5b3b50b 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -6,7 +6,7 @@ use super::channel::ArcChannel;
/// Represents the RPC method
pub type PubSubRPCMethod<'a> =
- Box<dyn Fn(ArcChannel, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
+ Box<dyn Fn(ArcChannel, String, serde_json::Value) -> PubSubRPCMethodOutput<'a> + Send + 'a>;
type PubSubRPCMethodOutput<'a> =
Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>;
@@ -51,7 +51,9 @@ macro_rules! impl_pubsub_rpc_service {
match name {
$(
stringify!($m) => {
- Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, params: serde_json::Value| Box::pin(self.$m(chan, params))))
+ Some(Box::new(move |chan: karyon_jsonrpc::ArcChannel, method: String, params: serde_json::Value| {
+ Box::pin(self.$m(chan, method, params))
+ }))
}
)*
_ => None,