From 7be7f59d5caf38ca0cd7a12a937a2cfdca0268d7 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 00:21:59 +0200 Subject: p2p: add rpc server to monitor example --- p2p/examples/monitor.rs | 120 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 29 deletions(-) (limited to 'p2p/examples') diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 32c8959..5382781 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -3,14 +3,17 @@ mod shared; use std::sync::Arc; use clap::Parser; +use log::error; use smol::{channel, Executor}; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, - Backend, Config, + ArcBackend, Backend, Config, }; +use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, SubscriptionID}; + use shared::run_executor; #[derive(Parser)] @@ -20,6 +23,10 @@ struct Cli { #[arg(short)] bootstrap_peers: Vec, + /// RPC server endpoint. + #[arg(short)] + rpc_endpoint: Endpoint, + /// Optional list of peer endpoints for manual connections. #[arg(short)] peer_endpoints: Vec, @@ -33,6 +40,71 @@ struct Cli { discovery_port: Option, } +struct MonitorRPC { + backend: ArcBackend, +} + +#[rpc_impl] +impl MonitorRPC { + async fn peer_id( + &self, + _params: serde_json::Value, + ) -> Result { + Ok(serde_json::json!(self.backend.peer_id().to_string())) + } + + async fn inbound_connection( + &self, + _params: serde_json::Value, + ) -> Result { + Ok(serde_json::json!(self.backend.inbound_slots())) + } + + async fn outbound_connection( + &self, + _params: serde_json::Value, + ) -> Result { + Ok(serde_json::json!(self.backend.inbound_slots())) + } +} + +#[rpc_pubsub_impl] +impl MonitorRPC { + async fn conn_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let conn_events = self.backend.monitor().conn_events().await; + smol::spawn(async move { + loop { + let _event = conn_events.recv().await; + if let Err(err) = sub.notify(serde_json::json!("event")).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn conn_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + fn main() { env_logger::init(); let cli = Cli::parse(); @@ -62,44 +134,34 @@ fn main() { let exc = ex.clone(); run_executor( async { - let monitor = backend.monitor(); - let conn_listener = monitor.conn_events().await; - let peerpool_listener = monitor.peer_pool_events().await; - let discovery_listener = monitor.discovery_events().await; - - let monitor_task = exc.spawn(async move { - loop { - let event = conn_listener.recv().await.unwrap(); - println!("New connection event: {}", event); - } - }); - - let monitor_task2 = exc.spawn(async move { - loop { - let event = peerpool_listener.recv().await.unwrap(); - println!("New peer pool event: {}", event); - } + // RPC service + let service = Arc::new(MonitorRPC { + backend: backend.clone(), }); - let monitor_task3 = exc.spawn(async move { - loop { - let event = discovery_listener.recv().await.unwrap(); - println!("New discovery event: {}", event); - } - }); + // Create rpc server + let server = Server::builder(cli.rpc_endpoint) + .expect("Create server builder") + .service(service.clone()) + .pubsub_service(service) + .build_with_executor(exc.clone().into()) + .await + .expect("Build rpc server"); // Run the backend - backend.run().await.unwrap(); + backend.run().await.expect("Run p2p backend"); + + // Run the RPC server + server.start().await; // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + ctrlc_r.recv().await.expect("Wait for ctrlc signal"); // Shutdown the backend backend.shutdown().await; - monitor_task.cancel().await; - monitor_task2.cancel().await; - monitor_task3.cancel().await; + // Shutdown the RPC server + server.shutdown().await; }, ex, ); -- cgit v1.2.3