aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/examples
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-21 02:20:45 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-22 15:02:06 +0200
commit028940fe3e0a87cdc421a6d07f1ecfb6c208b9d0 (patch)
tree3272d5c71cafb098e548cb9811e8f9ddc260ef2f /jsonrpc/examples
parent0f0cefb62ee8b641dcabcc0a2a1cf019c1de4843 (diff)
jsonrpc: support pubsub
Diffstat (limited to 'jsonrpc/examples')
-rw-r--r--jsonrpc/examples/pubsub_client.rs47
-rw-r--r--jsonrpc/examples/pubsub_server.rs69
-rw-r--r--jsonrpc/examples/server.rs4
-rw-r--r--jsonrpc/examples/tokio_server/Cargo.lock1
-rw-r--r--jsonrpc/examples/tokio_server/src/main.rs4
5 files changed, 123 insertions, 2 deletions
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
new file mode 100644
index 0000000..fee2a26
--- /dev/null
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -0,0 +1,47 @@
+use serde::{Deserialize, Serialize};
+use smol::stream::StreamExt;
+
+use karyon_jsonrpc::Client;
+
+#[derive(Deserialize, Serialize, Debug)]
+struct Pong {}
+
+fn main() {
+ env_logger::init();
+ smol::future::block_on(async {
+ let client = Client::builder("tcp://127.0.0.1:6000")
+ .expect("Create client builder")
+ .build()
+ .await
+ .expect("Build a client");
+
+ let result: Pong = client
+ .call("Calc.ping", ())
+ .await
+ .expect("Send ping request");
+
+ println!("receive pong msg: {:?}", result);
+
+ let (sub_id, sub) = client
+ .subscribe("Calc.log_subscribe", ())
+ .await
+ .expect("Subscribe to log_subscribe method");
+
+ smol::spawn(async move {
+ sub.for_each(|m| {
+ println!("Receive new notification: {m}");
+ })
+ .await
+ })
+ .detach();
+
+ smol::Timer::after(std::time::Duration::from_secs(5)).await;
+
+ client
+ .unsubscribe("Calc.log_unsubscribe", sub_id)
+ .await
+ .expect("Unsubscribe from log_unsubscirbe method");
+
+ smol::Timer::after(std::time::Duration::from_secs(2)).await;
+ });
+}
diff --git a/jsonrpc/examples/pubsub_server.rs b/jsonrpc/examples/pubsub_server.rs
new file mode 100644
index 0000000..739e6d5
--- /dev/null
+++ b/jsonrpc/examples/pubsub_server.rs
@@ -0,0 +1,69 @@
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server, SubscriptionID};
+
+struct Calc {}
+
+#[derive(Deserialize, Serialize)]
+struct Req {
+ x: u32,
+ y: u32,
+}
+
+#[derive(Deserialize, Serialize)]
+struct Pong {}
+
+#[rpc_impl]
+impl Calc {
+ async fn ping(&self, _params: Value) -> Result<Value, Error> {
+ Ok(serde_json::json!(Pong {}))
+ }
+}
+
+#[rpc_pubsub_impl]
+impl Calc {
+ async fn log_subscribe(&self, chan: ArcChannel, _params: Value) -> Result<Value, Error> {
+ let sub = chan.new_subscription().await;
+ let sub_id = sub.id.clone();
+ smol::spawn(async move {
+ loop {
+ smol::Timer::after(std::time::Duration::from_secs(1)).await;
+ if let Err(err) = sub.notify(serde_json::json!("Hello")).await {
+ println!("Error send notification {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn log_unsubscribe(&self, chan: ArcChannel, params: Value) -> Result<Value, Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
+fn main() {
+ env_logger::init();
+ smol::block_on(async {
+ let calc = Arc::new(Calc {});
+
+ // Creates a new server
+ let server = Server::builder("tcp://127.0.0.1:6000")
+ .expect("Create a new server builder")
+ .service(calc.clone())
+ .pubsub_service(calc)
+ .build()
+ .await
+ .expect("Build a new server");
+
+ // Start the server
+ server.start().await.expect("Start the server");
+ });
+}
diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs
index 841e276..5b951cd 100644
--- a/jsonrpc/examples/server.rs
+++ b/jsonrpc/examples/server.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -48,7 +50,7 @@ fn main() {
// Creates a new server
let server = Server::builder("tcp://127.0.0.1:6000")
.expect("Create a new server builder")
- .service(calc)
+ .service(Arc::new(calc))
.build()
.await
.expect("start a new server");
diff --git a/jsonrpc/examples/tokio_server/Cargo.lock b/jsonrpc/examples/tokio_server/Cargo.lock
index a7fdb0b..ab39fcd 100644
--- a/jsonrpc/examples/tokio_server/Cargo.lock
+++ b/jsonrpc/examples/tokio_server/Cargo.lock
@@ -681,6 +681,7 @@ dependencies = [
name = "karyon_jsonrpc"
version = "0.1.0"
dependencies = [
+ "async-channel",
"async-trait",
"async-tungstenite",
"karyon_core",
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index 978c90a..ce77cd3 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -48,7 +50,7 @@ async fn main() {
// Creates a new server
let server = Server::builder("tcp://127.0.0.1:6000")
.expect("Create a new server builder")
- .service(calc)
+ .service(Arc::new(calc))
.build()
.await
.expect("start a new server");