From e3d1f4fd91a5f077fda8a1976e194c378ee166d0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 13:25:39 +0200 Subject: p2p/monitor: use struct instead of enum for monitor events --- p2p/src/monitor/mod.rs | 209 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 p2p/src/monitor/mod.rs (limited to 'p2p/src/monitor/mod.rs') diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs new file mode 100644 index 0000000..34d252e --- /dev/null +++ b/p2p/src/monitor/mod.rs @@ -0,0 +1,209 @@ +mod event; + +use std::sync::Arc; + +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; + +use karyon_net::Endpoint; + +pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent}; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use crate::{Config, PeerID}; + +/// Responsible for network and system monitoring. +/// +/// It use pub-sub pattern to notify the subscribers with new events. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +/// +/// use karyon_p2p::{ +/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent, +/// }; +/// +/// async { +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); +/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); +/// +/// // Create a new Subscription +/// let monitor = backend.monitor(); +/// +/// let listener = monitor.register::().await; +/// +/// let new_event = listener.recv().await; +/// }; +/// ``` +pub struct Monitor { + event_sys: ArcEventSys, + config: Arc, +} + +impl Monitor { + /// Creates a new Monitor + pub(crate) fn new(config: Arc) -> Self { + Self { + event_sys: EventSys::new(), + config, + } + } + + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify(&self, event: E) { + if self.config.enable_monitor { + let event = event.to_struct(); + self.event_sys.emit(&event).await + } + } + + /// Registers a new event listener for the provided topic. + pub async fn register(&self) -> EventListener + where + E: Clone + EventValue + EventValueTopic, + { + self.event_sys.register(&E::topic()).await + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum MonitorTopic { + Connection, + PeerPool, + Discovery, +} + +pub(super) trait ToEventStruct: Sized { + type EventStruct: From + Clone + EventValueTopic + EventValue; + fn to_struct(self) -> Self::EventStruct { + self.into() + } +} + +impl ToEventStruct for ConnEvent { + type EventStruct = ConnectionEvent; +} + +impl ToEventStruct for PPEvent { + type EventStruct = PeerPoolEvent; +} + +impl ToEventStruct for DiscvEvent { + type EventStruct = DiscoveryEvent; +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct PeerPoolEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub peer_id: Option, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DiscoveryEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub size: Option, +} + +impl From for ConnectionEvent { + fn from(event: ConnEvent) -> Self { + let endpoint = event.get_endpoint().cloned(); + Self { + endpoint, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for PeerPoolEvent { + fn from(event: PPEvent) -> Self { + let peer_id = event.get_peer_id().cloned(); + Self { + peer_id, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From for DiscoveryEvent { + fn from(event: DiscvEvent) -> Self { + let (endpoint, size) = event.get_endpoint_and_size(); + Self { + endpoint: endpoint.cloned(), + size, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl EventValue for ConnectionEvent { + fn id() -> &'static str { + "ConnectionEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnectionEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Connection + } +} + +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool + } +} + +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery + } +} + +fn get_current_timestamp() -> i64 { + chrono::Utc::now().timestamp() +} -- cgit v1.2.3