diff options
Diffstat (limited to 'p2p/examples')
-rw-r--r-- | p2p/examples/monitor.rs | 183 |
1 files changed, 177 insertions, 6 deletions
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 5382781..1629207 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,14 +1,16 @@ mod shared; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use clap::Parser; use log::error; +use serde::{Deserialize, Serialize}; use smol::{channel, Executor}; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, + monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, ArcBackend, Backend, Config, }; @@ -81,8 +83,57 @@ impl MonitorRPC { let conn_events = self.backend.monitor().conn_events().await; smol::spawn(async move { loop { - let _event = conn_events.recv().await; - if let Err(err) = sub.notify(serde_json::json!("event")).await { + let event = conn_events.recv().await.unwrap(); + let event: ConnEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn peer_pool_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let peer_pool_events = self.backend.monitor().peer_pool_events().await; + smol::spawn(async move { + loop { + let event = peer_pool_events.recv().await.unwrap(); + let event: PeerPoolEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn discovery_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let discovery_events = self.backend.monitor().discovery_events().await; + smol::spawn(async move { + loop { + let event = discovery_events.recv().await.unwrap(); + let event: DiscoveryEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { error!("Failed to notify: {err}"); break; } @@ -103,6 +154,28 @@ impl MonitorRPC { chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) } + + async fn peer_pool_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } + + async fn discovery_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } } fn main() { @@ -148,12 +221,12 @@ fn main() { .await .expect("Build rpc server"); - // Run the backend - backend.run().await.expect("Run p2p backend"); - // Run the RPC server server.start().await; + // Run the backend + backend.run().await.expect("Run p2p backend"); + // Wait for ctrlc signal ctrlc_r.recv().await.expect("Wait for ctrlc signal"); @@ -166,3 +239,101 @@ fn main() { ex, ); } + +#[derive(Debug, Serialize, Deserialize)] +struct ConnEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PeerPoolEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + peer_id: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +impl From<ConnEvent> for ConnEventJson { + fn from(item: ConnEvent) -> Self { + match item { + ConnEvent::Connected(ref e) => ConnEventJson { + name: "Connected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Disconnected(e) => ConnEventJson { + name: "Disconnected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectFailed(e) => ConnEventJson { + name: "ConnectFailed".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectRetried(e) => ConnEventJson { + name: "ConnectRetried".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Accepted(e) => ConnEventJson { + name: "Accepted".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::AcceptFailed => ConnEventJson { + name: "AcceptFailed".into(), + endpoint: None, + }, + ConnEvent::Listening(e) => ConnEventJson { + name: "Listening".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ListenFailed(e) => ConnEventJson { + name: "ListenFailed".into(), + endpoint: Some(e.to_string()), + }, + } + } +} + +impl From<PeerPoolEvent> for PeerPoolEventJson { + fn from(item: PeerPoolEvent) -> Self { + match item { + PeerPoolEvent::NewPeer(id) => PeerPoolEventJson { + name: "NewPeer".into(), + peer_id: Some(id.to_string()), + }, + PeerPoolEvent::RemovePeer(id) => PeerPoolEventJson { + name: "RemovePeer".into(), + peer_id: Some(id.to_string()), + }, + } + } +} + +impl From<DiscoveryEvent> for DiscoveryEventJson { + fn from(item: DiscoveryEvent) -> Self { + match item { + DiscoveryEvent::RefreshStarted => DiscoveryEventJson { + name: "RefreshStarted".into(), + endpoint: None, + }, + DiscoveryEvent::LookupStarted(e) => DiscoveryEventJson { + name: "LookupStarted".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupFailed(e) => DiscoveryEventJson { + name: "LookupFailed".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupSucceeded(e, _) => DiscoveryEventJson { + name: "LookupSucceeded".into(), + endpoint: Some(e.to_string()), + }, + } + } +} |