From a06239ccc5e21fd20182ec3046cf9174ecc58a43 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 30 May 2024 02:07:50 +0200 Subject: jsonrpc/server: use queue with condvar instead of async channels --- jsonrpc/examples/pubsub_client.rs | 68 +++++++++++++++---------------- jsonrpc/examples/tokio_server/Cargo.toml | 4 ++ jsonrpc/examples/tokio_server/src/main.rs | 45 ++++++++++++++++++-- 3 files changed, 79 insertions(+), 38 deletions(-) (limited to 'jsonrpc/examples') diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index fee2a26..830b32f 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -1,47 +1,47 @@ +use std::time::Duration; + use serde::{Deserialize, Serialize}; -use smol::stream::StreamExt; +use smol::Timer; use karyon_jsonrpc::Client; #[derive(Deserialize, Serialize, Debug)] struct Pong {} -fn main() { - env_logger::init(); - smol::future::block_on(async { - let client = Client::builder("tcp://127.0.0.1:6000") - .expect("Create client builder") - .build() - .await - .expect("Build a client"); - - let result: Pong = client +async fn run_client() { + let client = Client::builder("tcp://127.0.0.1:6000") + .expect("Create client builder") + .build() + .await + .expect("Build a client"); + + let clientc = client.clone(); + smol::spawn(async move {}).detach(); + + let (_, sub) = client + .subscribe("Calc.log_subscribe", ()) + .await + .expect("Subscribe to log_subscribe method"); + + smol::spawn(async move { + loop { + let _m = sub.recv().await.unwrap(); + } + }) + .detach(); + + loop { + Timer::after(Duration::from_secs(1)).await; + let _: Pong = clientc .call("Calc.ping", ()) .await .expect("Send ping request"); + } +} - println!("receive pong msg: {:?}", result); - - let (sub_id, sub) = client - .subscribe("Calc.log_subscribe", ()) - .await - .expect("Subscribe to log_subscribe method"); - - smol::spawn(async move { - sub.for_each(|m| { - println!("Receive new notification: {m}"); - }) - .await - }) - .detach(); - - smol::Timer::after(std::time::Duration::from_secs(5)).await; - - client - .unsubscribe("Calc.log_unsubscribe", sub_id) - .await - .expect("Unsubscribe from log_unsubscirbe method"); - - smol::Timer::after(std::time::Duration::from_secs(2)).await; +fn main() { + env_logger::init(); + smol::future::block_on(async { + smol::spawn(run_client()).await; }); } diff --git a/jsonrpc/examples/tokio_server/Cargo.toml b/jsonrpc/examples/tokio_server/Cargo.toml index 93d8a61..9ed681b 100644 --- a/jsonrpc/examples/tokio_server/Cargo.toml +++ b/jsonrpc/examples/tokio_server/Cargo.toml @@ -12,3 +12,7 @@ serde = { version = "1.0.202", features = ["derive"] } serde_json = "1.0.117" tokio = { version = "1.37.0", features = ["full"] } +[profile.release] +debug = true + + diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 41d4c74..3bb4871 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use karyon_jsonrpc::{rpc_impl, Error, Server}; +use karyon_jsonrpc::{ + message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, +}; struct Calc { version: String, @@ -39,18 +41,53 @@ impl Calc { } } +#[rpc_pubsub_impl] +impl Calc { + async fn log_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: Value, + ) -> Result { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if let Err(_) = sub.notify(serde_json::json!("Hello")).await { + break; + } + } + }); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: Value, + ) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + #[tokio::main] async fn main() { env_logger::init(); // Register the Calc service - let calc = Calc { + let calc = Arc::new(Calc { version: String::from("0.1"), - }; + }); // Creates a new server let server = Server::builder("ws://127.0.0.1:6000") .expect("Create a new server builder") - .service(Arc::new(calc)) + .service(calc.clone()) + .pubsub_service(calc) .build() .await .expect("start a new server"); -- cgit v1.2.3