From e3d1f4fd91a5f077fda8a1976e194c378ee166d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 13:25:39 +0200 Subject: p2p/monitor: use struct instead of enum for monitor events --- p2p/src/discovery/lookup.rs | 8 +- p2p/src/discovery/refresh.rs | 4 +- p2p/src/monitor.rs | 198 ---------------------------------------- p2p/src/monitor/event.rs | 95 ++++++++++++++++++++ p2p/src/monitor/mod.rs | 209 +++++++++++++++++++++++++++++++++++++++++++ p2p/src/peer_pool.rs | 10 +-- 6 files changed, 313 insertions(+), 211 deletions(-) delete mode 100644 p2p/src/monitor.rs create mode 100644 p2p/src/monitor/event.rs create mode 100644 p2p/src/monitor/mod.rs diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 677bad7..a941986 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -20,7 +20,7 @@ use crate::{ get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, ShutdownMsg, }, - monitor::{ConnEvent, DiscoveryEvent, Monitor}, + monitor::{ConnEvent, DiscvEvent, Monitor}, routing_table::RoutingTable, slots::ConnectionSlots, version::version_match, @@ -131,7 +131,7 @@ impl LookupService { pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option) -> Result<()> { trace!("Lookup started {endpoint}"); self.monitor - .notify(DiscoveryEvent::LookupStarted(endpoint.clone())) + .notify(DiscvEvent::LookupStarted(endpoint.clone())) .await; let mut random_peers = vec![]; @@ -140,7 +140,7 @@ impl LookupService { .await { self.monitor - .notify(DiscoveryEvent::LookupFailed(endpoint.clone())) + .notify(DiscvEvent::LookupFailed(endpoint.clone())) .await; return Err(err); }; @@ -161,7 +161,7 @@ impl LookupService { } self.monitor - .notify(DiscoveryEvent::LookupSucceeded( + .notify(DiscvEvent::LookupSucceeded( endpoint.clone(), peer_buffer.len(), )) diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index eec6743..c1d222b 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -14,7 +14,7 @@ use karyon_net::{udp, Connection, Endpoint, Error as NetError}; use crate::{ codec::RefreshMsgCodec, message::RefreshMsg, - monitor::{ConnEvent, DiscoveryEvent, Monitor}, + monitor::{ConnEvent, DiscvEvent, Monitor}, routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, Config, Error, Result, }; @@ -116,7 +116,7 @@ impl RefreshService { sleep(Duration::from_secs(self.config.refresh_interval)).await; trace!("Start refreshing the routing table..."); - self.monitor.notify(DiscoveryEvent::RefreshStarted).await; + self.monitor.notify(DiscvEvent::RefreshStarted).await; let mut entries: Vec = vec![]; for bucket in self.table.buckets() { diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs deleted file mode 100644 index 4d6a46c..0000000 --- a/p2p/src/monitor.rs +++ /dev/null @@ -1,198 +0,0 @@ -use std::{fmt, sync::Arc}; - -use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; - -use karyon_net::Endpoint; - -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - -use crate::{Config, PeerID}; - -/// Responsible for network and system monitoring. -/// -/// It use pub-sub pattern to notify the subscribers with new events. -/// -/// # Example -/// -/// ``` -/// use std::sync::Arc; -/// -/// use smol::Executor; -/// -/// use karyon_p2p::{Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}}; -/// -/// async { -/// -/// // Create a new Executor -/// let ex = Arc::new(Executor::new()); -/// -/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); -/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); -/// -/// // Create a new Subscription -/// let monitor = backend.monitor(); -/// -/// let listener = monitor.conn_events().await; -/// -/// let new_event = listener.recv().await; -/// }; -/// ``` -pub struct Monitor { - event_sys: ArcEventSys, - config: Arc, -} - -impl Monitor { - /// Creates a new Monitor - pub(crate) fn new(config: Arc) -> Self { - Self { - event_sys: EventSys::new(), - config, - } - } - - /// Sends a new monitor event to subscribers. - pub(crate) async fn notify(&self, event: E) - where - E: EventValue + Clone + EventValueTopic, - { - if self.config.enable_monitor { - self.event_sys.emit(&event).await - } - } - - /// Registers a new event listener for connection events. - pub async fn conn_events(&self) -> EventListener { - self.event_sys.register(&MonitorTopic::Conn).await - } - - /// Registers a new event listener for peer pool events. - pub async fn peer_pool_events(&self) -> EventListener { - self.event_sys.register(&MonitorTopic::PeerPool).await - } - - /// Registers a new event listener for discovery events. - pub async fn discovery_events(&self) -> EventListener { - self.event_sys.register(&MonitorTopic::Discovery).await - } -} - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum MonitorTopic { - Conn, - PeerPool, - Discovery, -} - -/// Defines connection-related events. -#[derive(Clone, Debug)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum ConnEvent { - Connected(Endpoint), - ConnectRetried(Endpoint), - ConnectFailed(Endpoint), - Accepted(Endpoint), - AcceptFailed, - Disconnected(Endpoint), - Listening(Endpoint), - ListenFailed(Endpoint), -} - -/// Defines `PeerPool` events. -#[derive(Clone, Debug)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum PeerPoolEvent { - NewPeer(PeerID), - RemovePeer(PeerID), -} - -/// Defines `Discovery` events. -#[derive(Clone, Debug)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum DiscoveryEvent { - LookupStarted(Endpoint), - LookupFailed(Endpoint), - LookupSucceeded(Endpoint, usize), - RefreshStarted, -} - -impl fmt::Display for ConnEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - ConnEvent::Connected(endpoint) => format!("Connected: {endpoint}"), - ConnEvent::ConnectFailed(endpoint) => format!("ConnectFailed: {endpoint}"), - ConnEvent::ConnectRetried(endpoint) => format!("ConnectRetried: {endpoint}"), - ConnEvent::AcceptFailed => "AcceptFailed".to_string(), - ConnEvent::Accepted(endpoint) => format!("Accepted: {endpoint}"), - ConnEvent::Disconnected(endpoint) => format!("Disconnected: {endpoint}"), - ConnEvent::Listening(endpoint) => format!("Listening: {endpoint}"), - ConnEvent::ListenFailed(endpoint) => format!("ListenFailed: {endpoint}"), - }; - write!(f, "{}", val) - } -} - -impl fmt::Display for PeerPoolEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - PeerPoolEvent::NewPeer(pid) => format!("NewPeer: {pid}"), - PeerPoolEvent::RemovePeer(pid) => format!("RemovePeer: {pid}"), - }; - write!(f, "{}", val) - } -} - -impl fmt::Display for DiscoveryEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), - DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), - DiscoveryEvent::LookupSucceeded(endpoint, len) => { - format!("LookupSucceeded: {endpoint} {len}") - } - DiscoveryEvent::RefreshStarted => "RefreshStarted".to_string(), - }; - write!(f, "{}", val) - } -} - -impl EventValue for ConnEvent { - fn id() -> &'static str { - "ConnEvent" - } -} - -impl EventValue for PeerPoolEvent { - fn id() -> &'static str { - "PeerPoolEvent" - } -} - -impl EventValue for DiscoveryEvent { - fn id() -> &'static str { - "DiscoveryEvent" - } -} - -impl EventValueTopic for ConnEvent { - type Topic = MonitorTopic; - fn topic() -> Self::Topic { - MonitorTopic::Conn - } -} - -impl EventValueTopic for PeerPoolEvent { - type Topic = MonitorTopic; - fn topic() -> Self::Topic { - MonitorTopic::PeerPool - } -} - -impl EventValueTopic for DiscoveryEvent { - type Topic = MonitorTopic; - fn topic() -> Self::Topic { - MonitorTopic::Discovery - } -} diff --git a/p2p/src/monitor/event.rs b/p2p/src/monitor/event.rs new file mode 100644 index 0000000..d320e37 --- /dev/null +++ b/p2p/src/monitor/event.rs @@ -0,0 +1,95 @@ +use karyon_net::Endpoint; + +use crate::PeerID; + +/// Defines connection-related events. +#[derive(Clone, Debug)] +pub enum ConnEvent { + Connected(Endpoint), + ConnectRetried(Endpoint), + ConnectFailed(Endpoint), + Accepted(Endpoint), + AcceptFailed, + Disconnected(Endpoint), + Listening(Endpoint), + ListenFailed(Endpoint), +} + +/// Defines `PP` events. +#[derive(Clone, Debug)] +pub enum PPEvent { + NewPeer(PeerID), + RemovePeer(PeerID), +} + +/// Defines `Discovery` events. +#[derive(Clone, Debug)] +pub enum DiscvEvent { + LookupStarted(Endpoint), + LookupFailed(Endpoint), + LookupSucceeded(Endpoint, usize), + RefreshStarted, +} + +impl ConnEvent { + pub(super) fn get_endpoint(&self) -> Option<&Endpoint> { + match self { + ConnEvent::Connected(endpoint) + | ConnEvent::ConnectRetried(endpoint) + | ConnEvent::ConnectFailed(endpoint) + | ConnEvent::Accepted(endpoint) + | ConnEvent::Disconnected(endpoint) + | ConnEvent::Listening(endpoint) + | ConnEvent::ListenFailed(endpoint) => Some(endpoint), + ConnEvent::AcceptFailed => None, + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + ConnEvent::Connected(_) => "Connected", + ConnEvent::ConnectRetried(_) => "ConnectRetried", + ConnEvent::ConnectFailed(_) => "ConnectFailed", + ConnEvent::Accepted(_) => "Accepted", + ConnEvent::AcceptFailed => "AcceptFailed", + ConnEvent::Disconnected(_) => "Disconnected", + ConnEvent::Listening(_) => "Listening", + ConnEvent::ListenFailed(_) => "ListenFailed", + } + } +} + +impl PPEvent { + pub(super) fn get_peer_id(&self) -> Option<&PeerID> { + match self { + PPEvent::NewPeer(peer_id) | PPEvent::RemovePeer(peer_id) => Some(peer_id), + } + } + pub(super) fn variant_name(&self) -> &'static str { + match self { + PPEvent::NewPeer(_) => "NewPeer", + PPEvent::RemovePeer(_) => "RemovePeer", + } + } +} + +impl DiscvEvent { + pub(super) fn get_endpoint_and_size(&self) -> (Option<&Endpoint>, Option) { + match self { + DiscvEvent::LookupStarted(endpoint) | DiscvEvent::LookupFailed(endpoint) => { + (Some(endpoint), None) + } + DiscvEvent::LookupSucceeded(endpoint, size) => (Some(endpoint), Some(*size)), + DiscvEvent::RefreshStarted => (None, None), + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + DiscvEvent::LookupStarted(_) => "LookupStarted", + DiscvEvent::LookupFailed(_) => "LookupFailed", + DiscvEvent::LookupSucceeded(_, _) => "LookupSucceeded", + DiscvEvent::RefreshStarted => "RefreshStarted", + } + } +} diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs new file mode 100644 index 0000000..34d252e --- /dev/null +++ b/p2p/src/monitor/mod.rs @@ -0,0 +1,209 @@ +mod event; + +use std::sync::Arc; + +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; + +use karyon_net::Endpoint; + +pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent}; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use crate::{Config, PeerID}; + +/// Responsible for network and system monitoring. +/// +/// It use pub-sub pattern to notify the subscribers with new events. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +/// +/// use karyon_p2p::{ +/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent, +/// }; +/// +/// async { +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); +/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); +/// +/// // Create a new Subscription +/// let monitor = backend.monitor(); +/// +/// let listener = monitor.register::().await; +/// +/// let new_event = listener.recv().await; +/// }; +/// ``` +pub struct Monitor { + event_sys: ArcEventSys, + config: Arc, +} + +impl Monitor { + /// Creates a new Monitor + pub(crate) fn new(config: Arc) -> Self { + Self { + event_sys: EventSys::new(), + config, + } + } + + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify(&self, event: E) { + if self.config.enable_monitor { + let event = event.to_struct(); + self.event_sys.emit(&event).await + } + } + + /// Registers a new event listener for the provided topic. + pub async fn register(&self) -> EventListener + where + E: Clone + EventValue + EventValueTopic, + { + self.event_sys.register(&E::topic()).await + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum MonitorTopic { + Connection, + PeerPool, + Discovery, +} + +pub(super) trait ToEventStruct: Sized { + type EventStruct: From + Clone + EventValueTopic + EventValue; + fn to_struct(self) -> Self::EventStruct { + self.into() + } +} + +impl ToEventStruct for ConnEvent { + type EventStruct = ConnectionEvent; +} + +impl ToEventStruct for PPEvent { + type EventStruct = PeerPoolEvent; +} + +impl ToEventStruct for DiscvEvent { + type EventStruct = DiscoveryEvent; +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct PeerPoolEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub peer_id: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DiscoveryEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub size: Option, +} + +impl From for ConnectionEvent { + fn from(event: ConnEvent) -> Self { + let endpoint = event.get_endpoint().cloned(); + Self { + endpoint, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for PeerPoolEvent { + fn from(event: PPEvent) -> Self { + let peer_id = event.get_peer_id().cloned(); + Self { + peer_id, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for DiscoveryEvent { + fn from(event: DiscvEvent) -> Self { + let (endpoint, size) = event.get_endpoint_and_size(); + Self { + endpoint: endpoint.cloned(), + size, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl EventValue for ConnectionEvent { + fn id() -> &'static str { + "ConnectionEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnectionEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Connection + } +} + +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool + } +} + +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery + } +} + +fn get_current_timestamp() -> i64 { + chrono::Utc::now().timestamp() +} diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 79fd4b4..6c895a0 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -23,7 +23,7 @@ use crate::{ config::Config, conn_queue::{ConnDirection, ConnQueue}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, - monitor::{Monitor, PeerPoolEvent}, + monitor::{Monitor, PPEvent}, peer::{ArcPeer, Peer, PeerID}, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, @@ -192,9 +192,7 @@ impl PeerPool { info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); - self.monitor - .notify(PeerPoolEvent::NewPeer(pid.clone())) - .await; + self.monitor.notify(PPEvent::NewPeer(pid.clone())).await; Ok(()) } @@ -215,9 +213,7 @@ impl PeerPool { peer.shutdown().await; - self.monitor - .notify(PeerPoolEvent::RemovePeer(pid.clone())) - .await; + self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await; let endpoint = peer.remote_endpoint(); let direction = peer.direction(); -- cgit v1.2.3