mod event; use std::sync::Arc; use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; use karyon_net::Endpoint; pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use crate::{Config, PeerID}; /// Responsible for network and system monitoring. /// /// It use pub-sub pattern to notify the subscribers with new events. /// /// # Example /// /// ``` /// use std::sync::Arc; /// /// use smol::Executor; /// /// use karyon_p2p::{ /// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent, /// }; /// /// async { /// /// // Create a new Executor /// let ex = Arc::new(Executor::new()); /// /// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); /// let backend = Backend::new(&key_pair, Config::default(), ex.into()); /// /// // Create a new Subscription /// let monitor = backend.monitor(); /// /// let listener = monitor.register::().await; /// /// let new_event = listener.recv().await; /// }; /// ``` pub struct Monitor { event_sys: ArcEventSys, config: Arc, } impl Monitor { /// Creates a new Monitor pub(crate) fn new(config: Arc) -> Self { Self { event_sys: EventSys::new(), config, } } /// Sends a new monitor event to subscribers. pub(crate) async fn notify(&self, event: E) { if self.config.enable_monitor { let event = event.to_struct(); self.event_sys.emit(&event).await } } /// Registers a new event listener for the provided topic. pub async fn register(&self) -> EventListener where E: Clone + EventValue + EventValueTopic, { self.event_sys.register(&E::topic()).await } } #[derive(Clone, Debug, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum MonitorTopic { Connection, PeerPool, Discovery, } pub(super) trait ToEventStruct: Sized { type EventStruct: From + Clone + EventValueTopic + EventValue; fn to_struct(self) -> Self::EventStruct { self.into() } } impl ToEventStruct for ConnEvent { type EventStruct = ConnectionEvent; } impl ToEventStruct for PPEvent { type EventStruct = PeerPoolEvent; } impl ToEventStruct for DiscvEvent { type EventStruct = DiscoveryEvent; } #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ConnectionEvent { pub event: String, pub date: i64, #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub endpoint: Option, } #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct PeerPoolEvent { pub event: String, pub date: i64, #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub peer_id: Option, } #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct DiscoveryEvent { pub event: String, pub date: i64, #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub endpoint: Option, #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub size: Option, } impl From for ConnectionEvent { fn from(event: ConnEvent) -> Self { let endpoint = event.get_endpoint().cloned(); Self { endpoint, event: event.variant_name().to_string(), date: get_current_timestamp(), } } } impl From for PeerPoolEvent { fn from(event: PPEvent) -> Self { let peer_id = event.get_peer_id().cloned(); Self { peer_id, event: event.variant_name().to_string(), date: get_current_timestamp(), } } } impl From for DiscoveryEvent { fn from(event: DiscvEvent) -> Self { let (endpoint, size) = event.get_endpoint_and_size(); Self { endpoint: endpoint.cloned(), size, event: event.variant_name().to_string(), date: get_current_timestamp(), } } } impl EventValue for ConnectionEvent { fn id() -> &'static str { "ConnectionEvent" } } impl EventValue for PeerPoolEvent { fn id() -> &'static str { "PeerPoolEvent" } } impl EventValue for DiscoveryEvent { fn id() -> &'static str { "DiscoveryEvent" } } impl EventValueTopic for ConnectionEvent { type Topic = MonitorTopic; fn topic() -> Self::Topic { MonitorTopic::Connection } } impl EventValueTopic for PeerPoolEvent { type Topic = MonitorTopic; fn topic() -> Self::Topic { MonitorTopic::PeerPool } } impl EventValueTopic for DiscoveryEvent { type Topic = MonitorTopic; fn topic() -> Self::Topic { MonitorTopic::Discovery } } fn get_current_timestamp() -> i64 { chrono::Utc::now().timestamp() }