diff options
author | hozan23 <hozan23@karyontech.net> | 2024-05-23 13:27:43 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-05-23 13:27:43 +0200 |
commit | 681cfce4f412958dbb6dc8a1c07408826b4387a0 (patch) | |
tree | 9278976be1c59ce47f1833276007f5b45e99c2de | |
parent | d51f212628f4996d754745b4904a1994ba39a2d0 (diff) |
p2p: add peer pool logs and discovery logs to example/monitor.rs
-rw-r--r-- | Cargo.lock | 9 | ||||
-rw-r--r-- | core/src/event.rs | 10 | ||||
-rw-r--r-- | p2p/Cargo.toml | 1 | ||||
-rw-r--r-- | p2p/examples/monitor.rs | 183 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 6 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 16 | ||||
-rw-r--r-- | p2p/src/monitor.rs | 2 |
7 files changed, 193 insertions, 34 deletions
@@ -1326,6 +1326,7 @@ dependencies = [ "rcgen 0.12.1", "rustls-pki-types", "semver", + "serde", "serde_json", "sha2", "smol", @@ -1961,18 +1962,18 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.201" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.201" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", diff --git a/core/src/event.rs b/core/src/event.rs index 876fda5..a32677b 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -243,9 +243,9 @@ where /// Cancels the listener and removes it from the `EventSys`. pub async fn cancel(&self) { - self.event_sys() - .remove(&self.topic, &self.event_id, &self.id) - .await; + if let Some(es) = self.event_sys.upgrade() { + es.remove(&self.topic, &self.event_id, &self.id).await; + } } /// Returns the topic for this event listener. @@ -257,10 +257,6 @@ where pub async fn event_id(&self) -> &String { &self.event_id } - - fn event_sys(&self) -> ArcEventSys<T> { - self.event_sys.upgrade().unwrap() - } } /// An event within the [`EventSys`]. diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 83df9c0..22236e3 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -58,3 +58,4 @@ env_logger = "0.11.3" smol = "2.0.0" karyon_jsonrpc = { workspace = true, features = ["ws", "smol"] } serde_json = "1.0.117" +serde = { version = "1.0.202", features = ["derive"] } diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 5382781..1629207 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,14 +1,16 @@ mod shared; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use clap::Parser; use log::error; +use serde::{Deserialize, Serialize}; use smol::{channel, Executor}; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, + monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, ArcBackend, Backend, Config, }; @@ -81,8 +83,57 @@ impl MonitorRPC { let conn_events = self.backend.monitor().conn_events().await; smol::spawn(async move { loop { - let _event = conn_events.recv().await; - if let Err(err) = sub.notify(serde_json::json!("event")).await { + let event = conn_events.recv().await.unwrap(); + let event: ConnEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn peer_pool_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let peer_pool_events = self.backend.monitor().peer_pool_events().await; + smol::spawn(async move { + loop { + let event = peer_pool_events.recv().await.unwrap(); + let event: PeerPoolEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn discovery_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let discovery_events = self.backend.monitor().discovery_events().await; + smol::spawn(async move { + loop { + let event = discovery_events.recv().await.unwrap(); + let event: DiscoveryEventJson = event.into(); + if let Err(err) = sub.notify(serde_json::json!(event)).await { error!("Failed to notify: {err}"); break; } @@ -103,6 +154,28 @@ impl MonitorRPC { chan.remove_subscription(&sub_id).await; Ok(serde_json::json!(true)) } + + async fn peer_pool_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } + + async fn discovery_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } } fn main() { @@ -148,12 +221,12 @@ fn main() { .await .expect("Build rpc server"); - // Run the backend - backend.run().await.expect("Run p2p backend"); - // Run the RPC server server.start().await; + // Run the backend + backend.run().await.expect("Run p2p backend"); + // Wait for ctrlc signal ctrlc_r.recv().await.expect("Wait for ctrlc signal"); @@ -166,3 +239,101 @@ fn main() { ex, ); } + +#[derive(Debug, Serialize, Deserialize)] +struct ConnEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PeerPoolEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + peer_id: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryEventJson { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option<String>, +} + +impl From<ConnEvent> for ConnEventJson { + fn from(item: ConnEvent) -> Self { + match item { + ConnEvent::Connected(ref e) => ConnEventJson { + name: "Connected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Disconnected(e) => ConnEventJson { + name: "Disconnected".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectFailed(e) => ConnEventJson { + name: "ConnectFailed".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ConnectRetried(e) => ConnEventJson { + name: "ConnectRetried".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::Accepted(e) => ConnEventJson { + name: "Accepted".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::AcceptFailed => ConnEventJson { + name: "AcceptFailed".into(), + endpoint: None, + }, + ConnEvent::Listening(e) => ConnEventJson { + name: "Listening".into(), + endpoint: Some(e.to_string()), + }, + ConnEvent::ListenFailed(e) => ConnEventJson { + name: "ListenFailed".into(), + endpoint: Some(e.to_string()), + }, + } + } +} + +impl From<PeerPoolEvent> for PeerPoolEventJson { + fn from(item: PeerPoolEvent) -> Self { + match item { + PeerPoolEvent::NewPeer(id) => PeerPoolEventJson { + name: "NewPeer".into(), + peer_id: Some(id.to_string()), + }, + PeerPoolEvent::RemovePeer(id) => PeerPoolEventJson { + name: "RemovePeer".into(), + peer_id: Some(id.to_string()), + }, + } + } +} + +impl From<DiscoveryEvent> for DiscoveryEventJson { + fn from(item: DiscoveryEvent) -> Self { + match item { + DiscoveryEvent::RefreshStarted => DiscoveryEventJson { + name: "RefreshStarted".into(), + endpoint: None, + }, + DiscoveryEvent::LookupStarted(e) => DiscoveryEventJson { + name: "LookupStarted".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupFailed(e) => DiscoveryEventJson { + name: "LookupFailed".into(), + endpoint: Some(e.to_string()), + }, + DiscoveryEvent::LookupSucceeded(e, _) => DiscoveryEventJson { + name: "LookupSucceeded".into(), + endpoint: Some(e.to_string()), + }, + } + } +} diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 613c3cd..4a06083 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -234,14 +234,12 @@ impl LookupService { ) -> Result<Vec<PeerMsg>> { let conn = self.connector.connect(&endpoint, &peer_id).await?; self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone()))) + .notify(ConnEvent::Connected(endpoint.clone())) .await; let result = self.handle_outbound(conn, target_peer_id).await; - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) - .await; + self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; self.outbound_slots.remove().await; result diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 430e749..745a5d5 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -211,15 +211,13 @@ impl RefreshService { let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone()))) + .notify(ConnEvent::Listening(endpoint.clone())) .await; c } Err(err) => { self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed( - endpoint.clone(), - ))) + .notify(ConnEvent::ListenFailed(endpoint.clone())) .await; return Err(err.into()); } @@ -230,9 +228,7 @@ impl RefreshService { let res = self.listen_to_ping_msg(&conn).await; if let Err(err) = res { trace!("Failed to handle ping msg {err}"); - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed)) - .await; + self.monitor.notify(ConnEvent::AcceptFailed).await; } } } @@ -241,7 +237,7 @@ impl RefreshService { async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> { let (msg, endpoint) = conn.recv().await?; self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone()))) + .notify(ConnEvent::Accepted(endpoint.clone())) .await; match msg { @@ -252,9 +248,7 @@ impl RefreshService { RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())), } - self.monitor - .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) - .await; + self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; Ok(()) } diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index 945c6aa..b7afb7c 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -107,7 +107,6 @@ pub enum PeerPoolEvent { #[derive(Clone, Debug)] pub enum DiscoveryEvent { LookupStarted(Endpoint), - Conn(ConnEvent), LookupFailed(Endpoint), LookupSucceeded(Endpoint, usize), RefreshStarted, @@ -143,7 +142,6 @@ impl fmt::Display for DiscoveryEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), - DiscoveryEvent::Conn(event) => format!("Connection event: {event}"), DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), DiscoveryEvent::LookupSucceeded(endpoint, len) => { format!("LookupSucceeded: {endpoint} {len}") |