diff options
Diffstat (limited to 'jsonrpc')
| -rw-r--r-- | jsonrpc/examples/pubsub_client.rs | 68 | ||||
| -rw-r--r-- | jsonrpc/examples/tokio_server/Cargo.toml | 4 | ||||
| -rw-r--r-- | jsonrpc/examples/tokio_server/src/main.rs | 45 | ||||
| -rw-r--r-- | jsonrpc/src/client/mod.rs | 7 | ||||
| -rw-r--r-- | jsonrpc/src/server/channel.rs | 10 | ||||
| -rw-r--r-- | jsonrpc/src/server/mod.rs | 75 | 
6 files changed, 154 insertions, 55 deletions
| diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index fee2a26..830b32f 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -1,47 +1,47 @@ +use std::time::Duration; +  use serde::{Deserialize, Serialize}; -use smol::stream::StreamExt; +use smol::Timer;  use karyon_jsonrpc::Client;  #[derive(Deserialize, Serialize, Debug)]  struct Pong {} -fn main() { -    env_logger::init(); -    smol::future::block_on(async { -        let client = Client::builder("tcp://127.0.0.1:6000") -            .expect("Create client builder") -            .build() -            .await -            .expect("Build a client"); - -        let result: Pong = client +async fn run_client() { +    let client = Client::builder("tcp://127.0.0.1:6000") +        .expect("Create client builder") +        .build() +        .await +        .expect("Build a client"); + +    let clientc = client.clone(); +    smol::spawn(async move {}).detach(); + +    let (_, sub) = client +        .subscribe("Calc.log_subscribe", ()) +        .await +        .expect("Subscribe to log_subscribe method"); + +    smol::spawn(async move { +        loop { +            let _m = sub.recv().await.unwrap(); +        } +    }) +    .detach(); + +    loop { +        Timer::after(Duration::from_secs(1)).await; +        let _: Pong = clientc              .call("Calc.ping", ())              .await              .expect("Send ping request"); +    } +} -        println!("receive pong msg: {:?}", result); - -        let (sub_id, sub) = client -            .subscribe("Calc.log_subscribe", ()) -            .await -            .expect("Subscribe to log_subscribe method"); - -        smol::spawn(async move { -            sub.for_each(|m| { -                println!("Receive new notification: {m}"); -            }) -            .await -        }) -        .detach(); - -        smol::Timer::after(std::time::Duration::from_secs(5)).await; - -        client -            .unsubscribe("Calc.log_unsubscribe", sub_id) -            .await -            .expect("Unsubscribe from log_unsubscirbe method"); - -        smol::Timer::after(std::time::Duration::from_secs(2)).await; +fn main() { +    env_logger::init(); +    smol::future::block_on(async { +        smol::spawn(run_client()).await;      });  } diff --git a/jsonrpc/examples/tokio_server/Cargo.toml b/jsonrpc/examples/tokio_server/Cargo.toml index 93d8a61..9ed681b 100644 --- a/jsonrpc/examples/tokio_server/Cargo.toml +++ b/jsonrpc/examples/tokio_server/Cargo.toml @@ -12,3 +12,7 @@ serde = { version = "1.0.202", features = ["derive"] }  serde_json = "1.0.117"  tokio = { version = "1.37.0", features = ["full"] } +[profile.release] +debug = true + + diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 41d4c74..3bb4871 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration};  use serde::{Deserialize, Serialize};  use serde_json::Value; -use karyon_jsonrpc::{rpc_impl, Error, Server}; +use karyon_jsonrpc::{ +    message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, +};  struct Calc {      version: String, @@ -39,18 +41,53 @@ impl Calc {      }  } +#[rpc_pubsub_impl] +impl Calc { +    async fn log_subscribe( +        &self, +        chan: ArcChannel, +        method: String, +        _params: Value, +    ) -> Result<Value, Error> { +        let sub = chan.new_subscription(&method).await; +        let sub_id = sub.id.clone(); +        tokio::spawn(async move { +            loop { +                tokio::time::sleep(std::time::Duration::from_secs(1)).await; +                if let Err(_) = sub.notify(serde_json::json!("Hello")).await { +                    break; +                } +            } +        }); + +        Ok(serde_json::json!(sub_id)) +    } + +    async fn log_unsubscribe( +        &self, +        chan: ArcChannel, +        _method: String, +        params: Value, +    ) -> Result<Value, Error> { +        let sub_id: SubscriptionID = serde_json::from_value(params)?; +        chan.remove_subscription(&sub_id).await; +        Ok(serde_json::json!(true)) +    } +} +  #[tokio::main]  async fn main() {      env_logger::init();      // Register the Calc service -    let calc = Calc { +    let calc = Arc::new(Calc {          version: String::from("0.1"), -    }; +    });      // Creates a new server      let server = Server::builder("ws://127.0.0.1:6000")          .expect("Create a new server builder") -        .service(Arc::new(calc)) +        .service(calc.clone()) +        .pubsub_service(calc)          .build()          .await          .expect("start a new server"); diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 326252a..3a4505c 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -149,7 +149,12 @@ impl Client {              async move {                  loop {                      let msg = selfc.conn.recv().await?; -                    selfc.handle_msg(msg).await?; +                    if let Err(err) = selfc.handle_msg(msg).await { +                        error!( +                            "Failed to handle a new received msg from the connection {} : {err}", +                            selfc.conn.peer_endpoint()? +                        ); +                    }                  }              },              on_failure, diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index efcd344..a9d1002 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -59,7 +59,7 @@ pub struct Channel {  }  impl Channel { -    /// Creates a new `Channel` +    /// Creates a new [`Channel`]      pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> ArcChannel {          Arc::new(Self {              chan, @@ -67,7 +67,7 @@ impl Channel {          })      } -    /// Creates a new subscription +    /// Creates a new [`Subscription`]      pub async fn new_subscription(self: &Arc<Self>, method: &str) -> Subscription {          let sub_id = random_32();          let sub = Subscription::new(self.clone(), sub_id, self.chan.clone(), method); @@ -76,7 +76,7 @@ impl Channel {      }      /// Removes a subscription -    pub async fn remove_subscription(self: &Arc<Self>, id: &SubscriptionID) { +    pub async fn remove_subscription(&self, id: &SubscriptionID) {          let mut subs = self.subs.lock().await;          let i = match subs.iter().position(|i| i == id) {              Some(i) => i, @@ -84,4 +84,8 @@ impl Channel {          };          subs.remove(i);      } + +    pub fn close(&self) { +        self.chan.close(); +    }  } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 7f28de2..7136be4 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -3,11 +3,17 @@ pub mod channel;  pub mod pubsub_service;  pub mod service; -use std::{collections::HashMap, sync::Arc}; +use std::{ +    collections::{HashMap, VecDeque}, +    sync::Arc, +};  use log::{debug, error, trace, warn}; -use karyon_core::async_util::{select, Either, TaskGroup, TaskResult}; +use karyon_core::{ +    async_runtime::lock::Mutex, +    async_util::{select, CondVar, Either, TaskGroup, TaskResult}, +};  use karyon_net::{Conn, Endpoint, Listener};  use crate::{message, Error, PubSubRPCService, RPCService, Result}; @@ -47,6 +53,42 @@ enum SanityCheckResult {      ErrRes(message::Response),  } +struct ResponseQueue<T> { +    queue: Mutex<VecDeque<T>>, +    condvar: CondVar, +} + +impl<T> ResponseQueue<T> { +    fn new() -> Arc<Self> { +        Arc::new(Self { +            queue: Mutex::new(VecDeque::new()), +            condvar: CondVar::new(), +        }) +    } + +    /// Wait while the queue is empty, remove and return the item from the queue, +    /// panicking if empty (shouldn't happen) +    async fn recv(&self) -> T { +        let mut queue = self.queue.lock().await; + +        while queue.is_empty() { +            queue = self.condvar.wait(queue).await; +        } + +        match queue.pop_front() { +            Some(v) => v, +            None => unreachable!(), +        } +    } + +    /// Push an item into the queue, notify all waiting tasks that the +    /// condvar has changed +    async fn push(&self, res: T) { +        self.queue.lock().await.push_back(res); +        self.condvar.broadcast(); +    } +} +  /// Represents an RPC server  pub struct Server {      listener: Listener<serde_json::Value>, @@ -99,20 +141,21 @@ impl Server {          let endpoint = conn.peer_endpoint().expect("get peer endpoint");          debug!("Handle a new connection {endpoint}"); -        // TODO Avoid depending on channels -        let (tx, rx) = async_channel::bounded::<serde_json::Value>(CHANNEL_CAP); -          let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_CAP);          let channel = Channel::new(ch_tx); +        let queue = ResponseQueue::new(); + +        let chan = channel.clone();          let on_failure = |result: TaskResult<Result<()>>| async move {              if let TaskResult::Completed(Err(err)) = result {                  debug!("Notification loop stopped: {err}");              } +            // close the subscription channel +            chan.close();          }; -        let selfc = self.clone(); -        let txc = tx.clone(); +        let queue_cloned = queue.clone();          self.task_group.spawn(              async move {                  loop { @@ -127,28 +170,34 @@ impl Server {                          params,                      };                      debug!("--> {response}"); -                    txc.send(serde_json::to_value(response)?).await?; +                    queue_cloned.push(serde_json::json!(response)).await;                  }              },              on_failure,          ); +        let chan = channel.clone();          let on_failure = |result: TaskResult<Result<()>>| async move {              if let TaskResult::Completed(Err(err)) = result {                  error!("Connection {} dropped: {}", endpoint, err);              } else {                  warn!("Connection {} dropped", endpoint);              } +            // close the subscription channel when the connection dropped +            chan.close();          }; +        let selfc = self.clone();          self.task_group.spawn(              async move {                  loop { -                    match select(conn.recv(), rx.recv()).await { +                    match select(conn.recv(), queue.recv()).await {                          Either::Left(msg) => { -                            selfc.new_request(tx.clone(), channel.clone(), msg?).await; +                            selfc +                                .new_request(queue.clone(), channel.clone(), msg?) +                                .await;                          } -                        Either::Right(msg) => conn.send(msg?).await?, +                        Either::Right(res) => conn.send(res).await?,                      }                  }              }, @@ -194,7 +243,7 @@ impl Server {      /// Spawns a new task for handling a new request      async fn new_request(          self: &Arc<Self>, -        sender: async_channel::Sender<serde_json::Value>, +        queue: Arc<ResponseQueue<serde_json::Value>>,          channel: ArcChannel,          msg: serde_json::Value,      ) { @@ -209,7 +258,7 @@ impl Server {              async move {                  let response = selfc.handle_request(channel, msg).await;                  debug!("--> {response}"); -                sender.send(serde_json::json!(response)).await?; +                queue.push(serde_json::json!(response)).await;                  Ok(())              },              on_failure, | 
