diff options
Diffstat (limited to 'p2p/src/monitor')
-rw-r--r-- | p2p/src/monitor/event.rs | 95 | ||||
-rw-r--r-- | p2p/src/monitor/mod.rs | 209 |
2 files changed, 304 insertions, 0 deletions
diff --git a/p2p/src/monitor/event.rs b/p2p/src/monitor/event.rs new file mode 100644 index 0000000..d320e37 --- /dev/null +++ b/p2p/src/monitor/event.rs @@ -0,0 +1,95 @@ +use karyon_net::Endpoint; + +use crate::PeerID; + +/// Defines connection-related events. +#[derive(Clone, Debug)] +pub enum ConnEvent { + Connected(Endpoint), + ConnectRetried(Endpoint), + ConnectFailed(Endpoint), + Accepted(Endpoint), + AcceptFailed, + Disconnected(Endpoint), + Listening(Endpoint), + ListenFailed(Endpoint), +} + +/// Defines `PP` events. +#[derive(Clone, Debug)] +pub enum PPEvent { + NewPeer(PeerID), + RemovePeer(PeerID), +} + +/// Defines `Discovery` events. +#[derive(Clone, Debug)] +pub enum DiscvEvent { + LookupStarted(Endpoint), + LookupFailed(Endpoint), + LookupSucceeded(Endpoint, usize), + RefreshStarted, +} + +impl ConnEvent { + pub(super) fn get_endpoint(&self) -> Option<&Endpoint> { + match self { + ConnEvent::Connected(endpoint) + | ConnEvent::ConnectRetried(endpoint) + | ConnEvent::ConnectFailed(endpoint) + | ConnEvent::Accepted(endpoint) + | ConnEvent::Disconnected(endpoint) + | ConnEvent::Listening(endpoint) + | ConnEvent::ListenFailed(endpoint) => Some(endpoint), + ConnEvent::AcceptFailed => None, + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + ConnEvent::Connected(_) => "Connected", + ConnEvent::ConnectRetried(_) => "ConnectRetried", + ConnEvent::ConnectFailed(_) => "ConnectFailed", + ConnEvent::Accepted(_) => "Accepted", + ConnEvent::AcceptFailed => "AcceptFailed", + ConnEvent::Disconnected(_) => "Disconnected", + ConnEvent::Listening(_) => "Listening", + ConnEvent::ListenFailed(_) => "ListenFailed", + } + } +} + +impl PPEvent { + pub(super) fn get_peer_id(&self) -> Option<&PeerID> { + match self { + PPEvent::NewPeer(peer_id) | PPEvent::RemovePeer(peer_id) => Some(peer_id), + } + } + pub(super) fn variant_name(&self) -> &'static str { + match self { + PPEvent::NewPeer(_) => "NewPeer", + PPEvent::RemovePeer(_) => "RemovePeer", + } + } +} + +impl DiscvEvent { + pub(super) fn get_endpoint_and_size(&self) -> (Option<&Endpoint>, Option<usize>) { + match self { + DiscvEvent::LookupStarted(endpoint) | DiscvEvent::LookupFailed(endpoint) => { + (Some(endpoint), None) + } + DiscvEvent::LookupSucceeded(endpoint, size) => (Some(endpoint), Some(*size)), + DiscvEvent::RefreshStarted => (None, None), + } + } + + pub(super) fn variant_name(&self) -> &'static str { + match self { + DiscvEvent::LookupStarted(_) => "LookupStarted", + DiscvEvent::LookupFailed(_) => "LookupFailed", + DiscvEvent::LookupSucceeded(_, _) => "LookupSucceeded", + DiscvEvent::RefreshStarted => "RefreshStarted", + } + } +} diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs new file mode 100644 index 0000000..34d252e --- /dev/null +++ b/p2p/src/monitor/mod.rs @@ -0,0 +1,209 @@ +mod event; + +use std::sync::Arc; + +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; + +use karyon_net::Endpoint; + +pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent}; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use crate::{Config, PeerID}; + +/// Responsible for network and system monitoring. +/// +/// It use pub-sub pattern to notify the subscribers with new events. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +/// +/// use karyon_p2p::{ +/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent, +/// }; +/// +/// async { +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); +/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); +/// +/// // Create a new Subscription +/// let monitor = backend.monitor(); +/// +/// let listener = monitor.register::<ConnectionEvent>().await; +/// +/// let new_event = listener.recv().await; +/// }; +/// ``` +pub struct Monitor { + event_sys: ArcEventSys<MonitorTopic>, + config: Arc<Config>, +} + +impl Monitor { + /// Creates a new Monitor + pub(crate) fn new(config: Arc<Config>) -> Self { + Self { + event_sys: EventSys::new(), + config, + } + } + + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) { + if self.config.enable_monitor { + let event = event.to_struct(); + self.event_sys.emit(&event).await + } + } + + /// Registers a new event listener for the provided topic. + pub async fn register<E>(&self) -> EventListener<MonitorTopic, E> + where + E: Clone + EventValue + EventValueTopic<Topic = MonitorTopic>, + { + self.event_sys.register(&E::topic()).await + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum MonitorTopic { + Connection, + PeerPool, + Discovery, +} + +pub(super) trait ToEventStruct: Sized { + type EventStruct: From<Self> + Clone + EventValueTopic<Topic = MonitorTopic> + EventValue; + fn to_struct(self) -> Self::EventStruct { + self.into() + } +} + +impl ToEventStruct for ConnEvent { + type EventStruct = ConnectionEvent; +} + +impl ToEventStruct for PPEvent { + type EventStruct = PeerPoolEvent; +} + +impl ToEventStruct for DiscvEvent { + type EventStruct = DiscoveryEvent; +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option<Endpoint>, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct PeerPoolEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub peer_id: Option<PeerID>, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DiscoveryEvent { + pub event: String, + pub date: i64, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub endpoint: Option<Endpoint>, + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub size: Option<usize>, +} + +impl From<ConnEvent> for ConnectionEvent { + fn from(event: ConnEvent) -> Self { + let endpoint = event.get_endpoint().cloned(); + Self { + endpoint, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From<PPEvent> for PeerPoolEvent { + fn from(event: PPEvent) -> Self { + let peer_id = event.get_peer_id().cloned(); + Self { + peer_id, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl From<DiscvEvent> for DiscoveryEvent { + fn from(event: DiscvEvent) -> Self { + let (endpoint, size) = event.get_endpoint_and_size(); + Self { + endpoint: endpoint.cloned(), + size, + event: event.variant_name().to_string(), + date: get_current_timestamp(), + } + } +} + +impl EventValue for ConnectionEvent { + fn id() -> &'static str { + "ConnectionEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnectionEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Connection + } +} + +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool + } +} + +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery + } +} + +fn get_current_timestamp() -> i64 { + chrono::Utc::now().timestamp() +} |