From 028940fe3e0a87cdc421a6d07f1ecfb6c208b9d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 21 May 2024 02:20:45 +0200 Subject: jsonrpc: support pubsub --- jsonrpc/README.md | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 3 deletions(-) (limited to 'jsonrpc/README.md') diff --git a/jsonrpc/README.md b/jsonrpc/README.md index 0883bec..3322671 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -5,10 +5,13 @@ A fast and lightweight async implementation of [JSON-RPC features: - Supports TCP, TLS, WebSocket, and Unix protocols. -- Uses smol(async-std) as the async runtime, but also supports tokio via the +- Uses `smol`(async-std) as the async runtime, but also supports `tokio` via the `tokio` feature. - Allows registration of multiple services (structs) of different types on a single server. +- Supports pub/sub +- Allows passing an `async_executors::Executor` or tokio's `Runtime` when building + the server. ## Example @@ -16,8 +19,11 @@ features: use std::sync::Arc; use serde_json::Value; +use smol::stream::StreamExt; -use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; +use karyon_jsonrpc::{ + Error, Server, Client, rpc_impl, rpc_pubsub_impl, SubscriptionID, ArcChannel +}; struct HelloWorld {} @@ -37,12 +43,42 @@ impl HelloWorld { } } +#[rpc_pubsub_impl] +impl HelloWorld { + async fn log_subscribe(&self, chan: ArcChannel, _params: Value) -> Result { + let sub = chan.new_subscription().await; + let sub_id = sub.id.clone(); + smol::spawn(async move { + loop { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + if let Err(err) = sub.notify(serde_json::json!("Hello")).await { + println!("Error send notification {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn log_unsubscribe(&self, chan: ArcChannel, params: Value) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + + // Server async { + let service = Arc::new(HelloWorld {}); // Creates a new server + let server = Server::builder("tcp://127.0.0.1:60000") .expect("create new server builder") - .service(HelloWorld{}) + .service(service.clone()) + .pubsub_service(service) .build() .await .expect("build the server"); @@ -63,6 +99,26 @@ async { let result: String = client.call("HelloWorld.say_hello", "world".to_string()) .await .expect("send a request"); + + let (sub_id, sub) = client + .subscribe("Calc.log_subscribe", ()) + .await + .expect("Subscribe to log_subscribe method"); + + smol::spawn(async move { + sub.for_each(|m| { + println!("Receive new notification: {m}"); + }) + .await + }) + .detach(); + + smol::Timer::after(std::time::Duration::from_secs(5)).await; + + client + .unsubscribe("Calc.log_unsubscribe", sub_id) + .await + .expect("Unsubscribe from log_unsubscirbe method"); }; ``` -- cgit v1.2.3