aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples/monitor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/examples/monitor.rs')
-rw-r--r--p2p/examples/monitor.rs183
1 files changed, 177 insertions, 6 deletions
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index 5382781..1629207 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -1,14 +1,16 @@
mod shared;
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
use clap::Parser;
use log::error;
+use serde::{Deserialize, Serialize};
use smol::{channel, Executor};
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
+ monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent},
ArcBackend, Backend, Config,
};
@@ -81,8 +83,57 @@ impl MonitorRPC {
let conn_events = self.backend.monitor().conn_events().await;
smol::spawn(async move {
loop {
- let _event = conn_events.recv().await;
- if let Err(err) = sub.notify(serde_json::json!("event")).await {
+ let event = conn_events.recv().await.unwrap();
+ let event: ConnEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn peer_pool_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let peer_pool_events = self.backend.monitor().peer_pool_events().await;
+ smol::spawn(async move {
+ loop {
+ let event = peer_pool_events.recv().await.unwrap();
+ let event: PeerPoolEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn discovery_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let discovery_events = self.backend.monitor().discovery_events().await;
+ smol::spawn(async move {
+ loop {
+ let event = discovery_events.recv().await.unwrap();
+ let event: DiscoveryEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
error!("Failed to notify: {err}");
break;
}
@@ -103,6 +154,28 @@ impl MonitorRPC {
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
}
+
+ async fn peer_pool_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ async fn discovery_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
}
fn main() {
@@ -148,12 +221,12 @@ fn main() {
.await
.expect("Build rpc server");
- // Run the backend
- backend.run().await.expect("Run p2p backend");
-
// Run the RPC server
server.start().await;
+ // Run the backend
+ backend.run().await.expect("Run p2p backend");
+
// Wait for ctrlc signal
ctrlc_r.recv().await.expect("Wait for ctrlc signal");
@@ -166,3 +239,101 @@ fn main() {
ex,
);
}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct ConnEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ endpoint: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct PeerPoolEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ peer_id: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct DiscoveryEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ endpoint: Option<String>,
+}
+
+impl From<ConnEvent> for ConnEventJson {
+ fn from(item: ConnEvent) -> Self {
+ match item {
+ ConnEvent::Connected(ref e) => ConnEventJson {
+ name: "Connected".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::Disconnected(e) => ConnEventJson {
+ name: "Disconnected".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ConnectFailed(e) => ConnEventJson {
+ name: "ConnectFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ConnectRetried(e) => ConnEventJson {
+ name: "ConnectRetried".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::Accepted(e) => ConnEventJson {
+ name: "Accepted".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::AcceptFailed => ConnEventJson {
+ name: "AcceptFailed".into(),
+ endpoint: None,
+ },
+ ConnEvent::Listening(e) => ConnEventJson {
+ name: "Listening".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ListenFailed(e) => ConnEventJson {
+ name: "ListenFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ }
+ }
+}
+
+impl From<PeerPoolEvent> for PeerPoolEventJson {
+ fn from(item: PeerPoolEvent) -> Self {
+ match item {
+ PeerPoolEvent::NewPeer(id) => PeerPoolEventJson {
+ name: "NewPeer".into(),
+ peer_id: Some(id.to_string()),
+ },
+ PeerPoolEvent::RemovePeer(id) => PeerPoolEventJson {
+ name: "RemovePeer".into(),
+ peer_id: Some(id.to_string()),
+ },
+ }
+ }
+}
+
+impl From<DiscoveryEvent> for DiscoveryEventJson {
+ fn from(item: DiscoveryEvent) -> Self {
+ match item {
+ DiscoveryEvent::RefreshStarted => DiscoveryEventJson {
+ name: "RefreshStarted".into(),
+ endpoint: None,
+ },
+ DiscoveryEvent::LookupStarted(e) => DiscoveryEventJson {
+ name: "LookupStarted".into(),
+ endpoint: Some(e.to_string()),
+ },
+ DiscoveryEvent::LookupFailed(e) => DiscoveryEventJson {
+ name: "LookupFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ DiscoveryEvent::LookupSucceeded(e, _) => DiscoveryEventJson {
+ name: "LookupSucceeded".into(),
+ endpoint: Some(e.to_string()),
+ },
+ }
+ }
+}