aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/examples')
-rw-r--r--p2p/examples/monitor.rs120
1 files changed, 91 insertions, 29 deletions
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index 32c8959..5382781 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -3,14 +3,17 @@ mod shared;
use std::sync::Arc;
use clap::Parser;
+use log::error;
use smol::{channel, Executor};
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
- Backend, Config,
+ ArcBackend, Backend, Config,
};
+use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, SubscriptionID};
+
use shared::run_executor;
#[derive(Parser)]
@@ -20,6 +23,10 @@ struct Cli {
#[arg(short)]
bootstrap_peers: Vec<Endpoint>,
+ /// RPC server endpoint.
+ #[arg(short)]
+ rpc_endpoint: Endpoint,
+
/// Optional list of peer endpoints for manual connections.
#[arg(short)]
peer_endpoints: Vec<Endpoint>,
@@ -33,6 +40,71 @@ struct Cli {
discovery_port: Option<Port>,
}
+struct MonitorRPC {
+ backend: ArcBackend,
+}
+
+#[rpc_impl]
+impl MonitorRPC {
+ async fn peer_id(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.peer_id().to_string()))
+ }
+
+ async fn inbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+
+ async fn outbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+}
+
+#[rpc_pubsub_impl]
+impl MonitorRPC {
+ async fn conn_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let conn_events = self.backend.monitor().conn_events().await;
+ smol::spawn(async move {
+ loop {
+ let _event = conn_events.recv().await;
+ if let Err(err) = sub.notify(serde_json::json!("event")).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn conn_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
fn main() {
env_logger::init();
let cli = Cli::parse();
@@ -62,44 +134,34 @@ fn main() {
let exc = ex.clone();
run_executor(
async {
- let monitor = backend.monitor();
- let conn_listener = monitor.conn_events().await;
- let peerpool_listener = monitor.peer_pool_events().await;
- let discovery_listener = monitor.discovery_events().await;
-
- let monitor_task = exc.spawn(async move {
- loop {
- let event = conn_listener.recv().await.unwrap();
- println!("New connection event: {}", event);
- }
- });
-
- let monitor_task2 = exc.spawn(async move {
- loop {
- let event = peerpool_listener.recv().await.unwrap();
- println!("New peer pool event: {}", event);
- }
+ // RPC service
+ let service = Arc::new(MonitorRPC {
+ backend: backend.clone(),
});
- let monitor_task3 = exc.spawn(async move {
- loop {
- let event = discovery_listener.recv().await.unwrap();
- println!("New discovery event: {}", event);
- }
- });
+ // Create rpc server
+ let server = Server::builder(cli.rpc_endpoint)
+ .expect("Create server builder")
+ .service(service.clone())
+ .pubsub_service(service)
+ .build_with_executor(exc.clone().into())
+ .await
+ .expect("Build rpc server");
// Run the backend
- backend.run().await.unwrap();
+ backend.run().await.expect("Run p2p backend");
+
+ // Run the RPC server
+ server.start().await;
// Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ ctrlc_r.recv().await.expect("Wait for ctrlc signal");
// Shutdown the backend
backend.shutdown().await;
- monitor_task.cancel().await;
- monitor_task2.cancel().await;
- monitor_task3.cancel().await;
+ // Shutdown the RPC server
+ server.shutdown().await;
},
ex,
);