From e3d1f4fd91a5f077fda8a1976e194c378ee166d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 13:25:39 +0200 Subject: p2p/monitor: use struct instead of enum for monitor events --- p2p/src/monitor/event.rs | 95 +++++++++++++++++++++ p2p/src/monitor/mod.rs | 209 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 304 insertions(+) create mode 100644 p2p/src/monitor/event.rs create mode 100644 p2p/src/monitor/mod.rs (limited to 'p2p/src/monitor') diff --git a/p2p/src/monitor/event.rs b/p2p/src/monitor/event.rs new file mode 100644 index 0000000..d320e37 --- /dev/null +++ b/p2p/src/monitor/event.rs @@ -0,0 +1,95 @@ +use karyon_net::Endpoint; + +use crate::PeerID; + +/// Defines connection-related events. +#[derive(Clone, Debug)] +pub enum ConnEvent { + Connected(Endpoint), + ConnectRetried(Endpoint), + ConnectFailed(Endpoint), + Accepted(Endpoint), + AcceptFailed, + Disconnected(Endpoint), + Listening(Endpoint), + ListenFailed(Endpoint), +} + +/// Defines `PP` events. +#[derive(Clone, Debug)] +pub enum PPEvent { + NewPeer(PeerID), + RemovePeer(PeerID), +} + +/// Defines `Discovery` events. +#[derive(Clone, Debug)] +pub enum DiscvEvent { + LookupStarted(Endpoint), + LookupFailed(Endpoint), + LookupSucceeded(Endpoint, usize), + RefreshStarted, +} + +impl ConnEvent { + pub(super) fn get_endpoint(&self) -> Option<&Endpoint> { + match self { + ConnEvent::Connected(endpoint) + | ConnEvent::ConnectRetried(endpoint) + | ConnEvent::ConnectFailed(endpoint) + | ConnEvent::Accepted(endpoint) + | ConnEvent::Disconnected(endpoint) + | ConnEvent::Listening(endpoint) + | ConnEvent::ListenFailed(endpoint) => Some(endpoint), + ConnEvent::AcceptFailed => None, + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + ConnEvent::Connected(_) => "Connected", + ConnEvent::ConnectRetried(_) => "ConnectRetried", + ConnEvent::ConnectFailed(_) => "ConnectFailed", + ConnEvent::Accepted(_) => "Accepted", + ConnEvent::AcceptFailed => "AcceptFailed", + ConnEvent::Disconnected(_) => "Disconnected", + ConnEvent::Listening(_) => "Listening", + ConnEvent::ListenFailed(_) => "ListenFailed", + } + } +} + +impl PPEvent { + pub(super) fn get_peer_id(&self) -> Option<&PeerID> { + match self { + PPEvent::NewPeer(peer_id) | PPEvent::RemovePeer(peer_id) => Some(peer_id), + } + } + pub(super) fn variant_name(&self) -> &'static str { + match self { + PPEvent::NewPeer(_) => "NewPeer", + PPEvent::RemovePeer(_) => "RemovePeer", + } + } +} + +impl DiscvEvent { + pub(super) fn get_endpoint_and_size(&self) -> (Option<&Endpoint>, Option) { + match self { + DiscvEvent::LookupStarted(endpoint) | DiscvEvent::LookupFailed(endpoint) => { + (Some(endpoint), None) + } + DiscvEvent::LookupSucceeded(endpoint, size) => (Some(endpoint), Some(*size)), + DiscvEvent::RefreshStarted => (None, None), + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + DiscvEvent::LookupStarted(_) => "LookupStarted", + DiscvEvent::LookupFailed(_) => "LookupFailed", + DiscvEvent::LookupSucceeded(_, _) => "LookupSucceeded", + DiscvEvent::RefreshStarted => "RefreshStarted", + } + } +} diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs new file mode 100644 index 0000000..34d252e --- /dev/null +++ b/p2p/src/monitor/mod.rs @@ -0,0 +1,209 @@ +mod event; + +use std::sync::Arc; + +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; + +use karyon_net::Endpoint; + +pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent}; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use crate::{Config, PeerID}; + +/// Responsible for network and system monitoring. +/// +/// It use pub-sub pattern to notify the subscribers with new events. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +/// +/// use karyon_p2p::{ +/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent, +/// }; +/// +/// async { +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); +/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); +/// +/// // Create a new Subscription +/// let monitor = backend.monitor(); +/// +/// let listener = monitor.register::().await; +/// +/// let new_event = listener.recv().await; +/// }; +/// ``` +pub struct Monitor { + event_sys: ArcEventSys, + config: Arc, +} + +impl Monitor { + /// Creates a new Monitor + pub(crate) fn new(config: Arc) -> Self { + Self { + event_sys: EventSys::new(), + config, + } + } + + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify(&self, event: E) { + if self.config.enable_monitor { + let event = event.to_struct(); + self.event_sys.emit(&event).await + } + } + + /// Registers a new event listener for the provided topic. + pub async fn register(&self) -> EventListener + where + E: Clone + EventValue + EventValueTopic, + { + self.event_sys.register(&E::topic()).await + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum MonitorTopic { + Connection, + PeerPool, + Discovery, +} + +pub(super) trait ToEventStruct: Sized { + type EventStruct: From + Clone + EventValueTopic + EventValue; + fn to_struct(self) -> Self::EventStruct { + self.into() + } +} + +impl ToEventStruct for ConnEvent { + type EventStruct = ConnectionEvent; +} + +impl ToEventStruct for PPEvent { + type EventStruct = PeerPoolEvent; +} + +impl ToEventStruct for DiscvEvent { + type EventStruct = DiscoveryEvent; +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct PeerPoolEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub peer_id: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DiscoveryEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub size: Option, +} + +impl From for ConnectionEvent { + fn from(event: ConnEvent) -> Self { + let endpoint = event.get_endpoint().cloned(); + Self { + endpoint, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for PeerPoolEvent { + fn from(event: PPEvent) -> Self { + let peer_id = event.get_peer_id().cloned(); + Self { + peer_id, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for DiscoveryEvent { + fn from(event: DiscvEvent) -> Self { + let (endpoint, size) = event.get_endpoint_and_size(); + Self { + endpoint: endpoint.cloned(), + size, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl EventValue for ConnectionEvent { + fn id() -> &'static str { + "ConnectionEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnectionEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Connection + } +} + +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool + } +} + +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery + } +} + +fn get_current_timestamp() -> i64 { + chrono::Utc::now().timestamp() +} -- cgit v1.2.3