From a06239ccc5e21fd20182ec3046cf9174ecc58a43 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 30 May 2024 02:07:50 +0200 Subject: jsonrpc/server: use queue with condvar instead of async channels --- jsonrpc/examples/tokio_server/src/main.rs | 45 ++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) (limited to 'jsonrpc/examples/tokio_server/src/main.rs') diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs index 41d4c74..3bb4871 100644 --- a/jsonrpc/examples/tokio_server/src/main.rs +++ b/jsonrpc/examples/tokio_server/src/main.rs @@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use karyon_jsonrpc::{rpc_impl, Error, Server}; +use karyon_jsonrpc::{ + message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, +}; struct Calc { version: String, @@ -39,18 +41,53 @@ impl Calc { } } +#[rpc_pubsub_impl] +impl Calc { + async fn log_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: Value, + ) -> Result { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if let Err(_) = sub.notify(serde_json::json!("Hello")).await { + break; + } + } + }); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: Value, + ) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + #[tokio::main] async fn main() { env_logger::init(); // Register the Calc service - let calc = Calc { + let calc = Arc::new(Calc { version: String::from("0.1"), - }; + }); // Creates a new server let server = Server::builder("ws://127.0.0.1:6000") .expect("Create a new server builder") - .service(Arc::new(calc)) + .service(calc.clone()) + .pubsub_service(calc) .build() .await .expect("start a new server"); -- cgit v1.2.3