diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/Cargo.toml | 1 | ||||
-rw-r--r-- | p2p/examples/monitor.rs | 183 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 6 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 16 | ||||
-rw-r--r-- | p2p/src/monitor.rs | 2 |
5 files changed, 185 insertions, 23 deletions
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 83df9c0..22236e3 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -58,3 +58,4 @@ env_logger = "0.11.3" smol = "2.0.0" karyon_jsonrpc = { workspace = true, features = ["ws", "smol"] } serde_json = "1.0.117" +serde = { version = "1.0.202", features = ["derive"] } diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 5382781..1629207 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,14 +1,16 @@ mod shared; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use clap::Parser; use log::error; +use serde::{Deserialize, Serialize}; use smol::{channel, Executor}; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, + monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, ArcBackend, Backend, Config, }; @@ -81,8 +83,57 @@ impl MonitorRPC { let conn_events = self.backend.monitor().conn_events().await; smol::spawn(async move { loop { - let _event = conn_events.recv().await; - if let Err(err) = sub.notify(serde_json::json!("event")).await { + let event = conn_events.recv().await.unwrap(); + let event: ConnEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn peer_pool_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let peer_pool_events = self.backend.monitor().peer_pool_events().await; + smol::spawn(async move { + loop { + let event = peer_pool_events.recv().await.unwrap(); + let event: PeerPoolEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn discovery_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let discovery_events = self.backend.monitor().discovery_events().await; + smol::spawn(async move { + loop { + let event = discovery_events.recv().await.unwrap(); + let event: DiscoveryEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { error!("Failed to notify: {err}"); break; } @@ -103,6 +154,28 @@ impl MonitorRPC { chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) } + + async fn peer_pool_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } + + async fn discovery_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } } fn main() { @@ -148,12 +221,12 @@ fn main() { .await .expect("Build rpc server"); - // Run the backend - backend.run().await.expect("Run p2p backend"); - // Run the RPC server server.start().await; + // Run the backend + backend.run().await.expect("Run p2p backend"); + // Wait for ctrlc signal ctrlc_r.recv().await.expect("Wait for ctrlc signal"); @@ -166,3 +239,101 @@ fn main() { ex, ); } + +#[derive(Debug, Serialize, Deserialize)] +struct ConnEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PeerPoolEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + peer_id: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +impl From<ConnEvent> for ConnEventJson { + fn from(item: ConnEvent) -> Self { + match item { + ConnEvent::Connected(ref e) => ConnEventJson { + name: "Connected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Disconnected(e) => ConnEventJson { + name: "Disconnected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectFailed(e) => ConnEventJson { + name: "ConnectFailed".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectRetried(e) => ConnEventJson { + name: "ConnectRetried".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Accepted(e) => ConnEventJson { + name: "Accepted".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::AcceptFailed => ConnEventJson { + name: "AcceptFailed".into(), + endpoint: None, + }, + ConnEvent::Listening(e) => ConnEventJson { + name: "Listening".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ListenFailed(e) => ConnEventJson { + name: "ListenFailed".into(), + endpoint: Some(e.to_string()), + }, + } + } +} + +impl From<PeerPoolEvent> for PeerPoolEventJson { + fn from(item: PeerPoolEvent) -> Self { + match item { + PeerPoolEvent::NewPeer(id) => PeerPoolEventJson { + name: "NewPeer".into(), + peer_id: Some(id.to_string()), + }, + PeerPoolEvent::RemovePeer(id) => PeerPoolEventJson { + name: "RemovePeer".into(), + peer_id: Some(id.to_string()), + }, + } + } +} + +impl From<DiscoveryEvent> for DiscoveryEventJson { + fn from(item: DiscoveryEvent) -> Self { + match item { + DiscoveryEvent::RefreshStarted => DiscoveryEventJson { + name: "RefreshStarted".into(), + endpoint: None, + }, + DiscoveryEvent::LookupStarted(e) => DiscoveryEventJson { + name: "LookupStarted".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupFailed(e) => DiscoveryEventJson { + name: "LookupFailed".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupSucceeded(e, _) => DiscoveryEventJson { + name: "LookupSucceeded".into(), + endpoint: Some(e.to_string()), + }, + } + } +} diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 613c3cd..4a06083 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -234,14 +234,12 @@ impl LookupService { ) -> Result<Vec<PeerMsg>> { let conn = self.connector.connect(&endpoint, &peer_id).await?; self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone()))) + .notify(ConnEvent::Connected(endpoint.clone())) .await; let result = self.handle_outbound(conn, target_peer_id).await; - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) - .await; + self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; self.outbound_slots.remove().await; result diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 430e749..745a5d5 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -211,15 +211,13 @@ impl RefreshService { let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone()))) + .notify(ConnEvent::Listening(endpoint.clone())) .await; c } Err(err) => { self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed( - endpoint.clone(), - ))) + .notify(ConnEvent::ListenFailed(endpoint.clone())) .await; return Err(err.into()); } @@ -230,9 +228,7 @@ impl RefreshService { let res = self.listen_to_ping_msg(&conn).await; if let Err(err) = res { trace!("Failed to handle ping msg {err}"); - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed)) - .await; + self.monitor.notify(ConnEvent::AcceptFailed).await; } } } @@ -241,7 +237,7 @@ impl RefreshService { async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> { let (msg, endpoint) = conn.recv().await?; self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone()))) + .notify(ConnEvent::Accepted(endpoint.clone())) .await; match msg { @@ -252,9 +248,7 @@ impl RefreshService { RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())), } - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) - .await; + self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; Ok(()) } diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index 945c6aa..b7afb7c 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -107,7 +107,6 @@ pub enum PeerPoolEvent { #[derive(Clone, Debug)] pub enum DiscoveryEvent { LookupStarted(Endpoint), - Conn(ConnEvent), LookupFailed(Endpoint), LookupSucceeded(Endpoint, usize), RefreshStarted, @@ -143,7 +142,6 @@ impl fmt::Display for DiscoveryEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), - DiscoveryEvent::Conn(event) => format!("Connection event: {event}"), DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), DiscoveryEvent::LookupSucceeded(endpoint, len) => { format!("LookupSucceeded: {endpoint} {len}") |