diff options
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
| -rw-r--r-- | jsonrpc/src/server/mod.rs | 75 | 
1 files changed, 62 insertions, 13 deletions
| diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7f28de2..7136be4 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -3,11 +3,17 @@ pub mod channel;  pub mod pubsub_service;  pub mod service; -use std::{collections::HashMap, sync::Arc}; +use std::{ +    collections::{HashMap, VecDeque}, +    sync::Arc, +};  use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ +    async_runtime::lock::Mutex, +    async_util::{select, CondVar, Either, TaskGroup, TaskResult}, +};  use karyon_net::{Conn, Endpoint, Listener};  use crate::{message, Error, PubSubRPCService, RPCService, Result}; @@ -47,6 +53,42 @@ enum SanityCheckResult {      ErrRes(message::Response),  } +struct ResponseQueue<T> { +    queue: Mutex<VecDeque<T>>, +    condvar: CondVar, +} + +impl<T> ResponseQueue<T> { +    fn new() -> Arc<Self> { +        Arc::new(Self { +            queue: Mutex::new(VecDeque::new()), +            condvar: CondVar::new(), +        }) +    } + +    /// Wait while the queue is empty, remove and return the item from the queue, +    /// panicking if empty (shouldn't happen) +    async fn recv(&self) -> T { +        let mut queue = self.queue.lock().await; + +        while queue.is_empty() { +            queue = self.condvar.wait(queue).await; +        } + +        match queue.pop_front() { +            Some(v) => v, +            None => unreachable!(), +        } +    } + +    /// Push an item into the queue, notify all waiting tasks that the +    /// condvar has changed +    async fn push(&self, res: T) { +        self.queue.lock().await.push_back(res); +        self.condvar.broadcast(); +    } +} +  /// Represents an RPC server  pub struct Server {      listener: Listener<serde_json::Value>, @@ -99,20 +141,21 @@ impl Server {          let endpoint = conn.peer_endpoint().expect("get peer endpoint");          debug!("Handle a new connection {endpoint}"); -        // TODO Avoid depending on channels -        let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP); -          let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);          let channel = Channel::new(ch_tx); +        let queue = ResponseQueue::new(); + +        let chan = channel.clone();          let on_failure = |result: TaskResult<Result<()>>| async move {              if let TaskResult::Completed(Err(err)) = result {                  debug!("Notification loop stopped: {err}");              } +            // close the subscription channel +            chan.close();          }; -        let selfc = self.clone(); -        let txc = tx.clone(); +        let queue_cloned = queue.clone();          self.task_group.spawn(              async move {                  loop { @@ -127,28 +170,34 @@ impl Server {                          params,                      };                      debug!("--> {response}"); -                    txc.send(serde_json::to_value(response)?).await?; +                    queue_cloned.push(serde_json::json!(response)).await;                  }              },              on_failure,          ); +        let chan = channel.clone();          let on_failure = |result: TaskResult<Result<()>>| async move {              if let TaskResult::Completed(Err(err)) = result {                  error!("Connection {} dropped: {}", endpoint, err);              } else {                  warn!("Connection {} dropped", endpoint);              } +            // close the subscription channel when the connection dropped +            chan.close();          }; +        let selfc = self.clone();          self.task_group.spawn(              async move {                  loop { -                    match select(conn.recv(), rx.recv()).await { +                    match select(conn.recv(), queue.recv()).await {                          Either::Left(msg) => { -                            selfc.new_request(tx.clone(), channel.clone(), msg?).await; +                            selfc +                                .new_request(queue.clone(), channel.clone(), msg?) +                                .await;                          } -                        Either::Right(msg) => conn.send(msg?).await?, +                        Either::Right(res) => conn.send(res).await?,                      }                  }              }, @@ -194,7 +243,7 @@ impl Server {      /// Spawns a new task for handling a new request      async fn new_request(          self: &Arc<Self>, -        sender: async_channel::Sender<serde_json::Value>, +        queue: Arc<ResponseQueue<serde_json::Value>>,          channel: ArcChannel,          msg: serde_json::Value,      ) { @@ -209,7 +258,7 @@ impl Server {              async move {                  let response = selfc.handle_request(channel, msg).await;                  debug!("--> {response}"); -                sender.send(serde_json::json!(response)).await?; +                queue.push(serde_json::json!(response)).await;                  Ok(())              },              on_failure, | 
