aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/examples
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/examples')
-rw-r--r--jsonrpc/examples/pubsub_client.rs68
-rw-r--r--jsonrpc/examples/tokio_server/Cargo.toml4
-rw-r--r--jsonrpc/examples/tokio_server/src/main.rs45
3 files changed, 79 insertions, 38 deletions
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
index fee2a26..830b32f 100644
--- a/jsonrpc/examples/pubsub_client.rs
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -1,47 +1,47 @@
+use std::time::Duration;
+
use serde::{Deserialize, Serialize};
-use smol::stream::StreamExt;
+use smol::Timer;
use karyon_jsonrpc::Client;
#[derive(Deserialize, Serialize, Debug)]
struct Pong {}
-fn main() {
- env_logger::init();
- smol::future::block_on(async {
- let client = Client::builder("tcp://127.0.0.1:6000")
- .expect("Create client builder")
- .build()
- .await
- .expect("Build a client");
-
- let result: Pong = client
+async fn run_client() {
+ let client = Client::builder("tcp://127.0.0.1:6000")
+ .expect("Create client builder")
+ .build()
+ .await
+ .expect("Build a client");
+
+ let clientc = client.clone();
+ smol::spawn(async move {}).detach();
+
+ let (_, sub) = client
+ .subscribe("Calc.log_subscribe", ())
+ .await
+ .expect("Subscribe to log_subscribe method");
+
+ smol::spawn(async move {
+ loop {
+ let _m = sub.recv().await.unwrap();
+ }
+ })
+ .detach();
+
+ loop {
+ Timer::after(Duration::from_secs(1)).await;
+ let _: Pong = clientc
.call("Calc.ping", ())
.await
.expect("Send ping request");
+ }
+}
- println!("receive pong msg: {:?}", result);
-
- let (sub_id, sub) = client
- .subscribe("Calc.log_subscribe", ())
- .await
- .expect("Subscribe to log_subscribe method");
-
- smol::spawn(async move {
- sub.for_each(|m| {
- println!("Receive new notification: {m}");
- })
- .await
- })
- .detach();
-
- smol::Timer::after(std::time::Duration::from_secs(5)).await;
-
- client
- .unsubscribe("Calc.log_unsubscribe", sub_id)
- .await
- .expect("Unsubscribe from log_unsubscirbe method");
-
- smol::Timer::after(std::time::Duration::from_secs(2)).await;
+fn main() {
+ env_logger::init();
+ smol::future::block_on(async {
+ smol::spawn(run_client()).await;
});
}
diff --git a/jsonrpc/examples/tokio_server/Cargo.toml b/jsonrpc/examples/tokio_server/Cargo.toml
index 93d8a61..9ed681b 100644
--- a/jsonrpc/examples/tokio_server/Cargo.toml
+++ b/jsonrpc/examples/tokio_server/Cargo.toml
@@ -12,3 +12,7 @@ serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.37.0", features = ["full"] }
+[profile.release]
+debug = true
+
+
diff --git a/jsonrpc/examples/tokio_server/src/main.rs b/jsonrpc/examples/tokio_server/src/main.rs
index 41d4c74..3bb4871 100644
--- a/jsonrpc/examples/tokio_server/src/main.rs
+++ b/jsonrpc/examples/tokio_server/src/main.rs
@@ -3,7 +3,9 @@ use std::{sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use karyon_jsonrpc::{rpc_impl, Error, Server};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, ArcChannel, Error, Server,
+};
struct Calc {
version: String,
@@ -39,18 +41,53 @@ impl Calc {
}
}
+#[rpc_pubsub_impl]
+impl Calc {
+ async fn log_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: Value,
+ ) -> Result<Value, Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+ if let Err(_) = sub.notify(serde_json::json!("Hello")).await {
+ break;
+ }
+ }
+ });
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn log_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: Value,
+ ) -> Result<Value, Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
#[tokio::main]
async fn main() {
env_logger::init();
// Register the Calc service
- let calc = Calc {
+ let calc = Arc::new(Calc {
version: String::from("0.1"),
- };
+ });
// Creates a new server
let server = Server::builder("ws://127.0.0.1:6000")
.expect("Create a new server builder")
- .service(Arc::new(calc))
+ .service(calc.clone())
+ .pubsub_service(calc)
.build()
.await
.expect("start a new server");