From 5df6812dd2254b871eb773dc626b89646ca87495 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 22 May 2024 18:02:51 +0200 Subject: p2p: monitor system use core::EventSys instead of pubsub pattern --- p2p/src/monitor.rs | 102 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 38 deletions(-) (limited to 'p2p/src/monitor.rs') diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index cbbd40c..bc3ea7f 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -2,7 +2,7 @@ use std::fmt; use crate::PeerID; -use karyon_core::pubsub::{ArcPublisher, Publisher, Subscription}; +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; use karyon_net::Endpoint; @@ -28,40 +28,54 @@ use karyon_net::Endpoint; /// let backend = Backend::new(&key_pair, Config::default(), ex.into()); /// /// // Create a new Subscription -/// let sub = backend.monitor().await; +/// let monitor = backend.monitor(); /// -/// let event = sub.recv().await; +/// let listener = monitor.conn_events().await; +/// +/// let new_event = listener.recv().await; /// }; /// ``` pub struct Monitor { - inner: ArcPublisher, + event_sys: ArcEventSys, } impl Monitor { /// Creates a new Monitor - pub(crate) fn new() -> Monitor { + pub(crate) fn new() -> Self { Self { - inner: Publisher::new(), + event_sys: EventSys::new(), } } - /// Sends a new monitor event to all subscribers. - pub async fn notify(&self, event: &MonitorEvent) { - self.inner.notify(event).await; + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify(&self, event: E) + where + E: EventValue + Clone + EventValueTopic, + { + self.event_sys.emit(&event).await + } + + /// Registers a new event listener for connection events. + pub async fn conn_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::Conn).await } - /// Subscribes to listen to new events. - pub async fn subscribe(&self) -> Subscription { - self.inner.subscribe().await + /// Registers a new event listener for peer pool events. + pub async fn peer_pool_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::PeerPool).await + } + + /// Registers a new event listener for discovery events. + pub async fn discovery_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::Discovery).await } } -/// Defines various type of event that can be monitored. -#[derive(Clone, Debug)] -pub enum MonitorEvent { - Conn(ConnEvent), - PeerPool(PeerPoolEvent), - Discovery(DiscoveryEvent), +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub enum MonitorTopic { + Conn, + PeerPool, + Discovery, } /// Defines connection-related events. @@ -88,22 +102,12 @@ pub enum PeerPoolEvent { #[derive(Clone, Debug)] pub enum DiscoveryEvent { LookupStarted(Endpoint), + Conn(ConnEvent), LookupFailed(Endpoint), LookupSucceeded(Endpoint, usize), RefreshStarted, } -impl fmt::Display for MonitorEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - MonitorEvent::Conn(e) => format!("Connection Event: {e}"), - MonitorEvent::PeerPool(e) => format!("PeerPool Event: {e}"), - MonitorEvent::Discovery(e) => format!("Discovery Event: {e}"), - }; - write!(f, "{}", val) - } -} - impl fmt::Display for ConnEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { @@ -134,6 +138,7 @@ impl fmt::Display for DiscoveryEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), + DiscoveryEvent::Conn(event) => format!("Connection event: {event}"), DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), DiscoveryEvent::LookupSucceeded(endpoint, len) => { format!("LookupSucceeded: {endpoint} {len}") @@ -144,20 +149,41 @@ impl fmt::Display for DiscoveryEvent { } } -impl From for MonitorEvent { - fn from(val: ConnEvent) -> Self { - MonitorEvent::Conn(val) +impl EventValue for ConnEvent { + fn id() -> &'static str { + "ConnEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Conn } } -impl From for MonitorEvent { - fn from(val: PeerPoolEvent) -> Self { - MonitorEvent::PeerPool(val) +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool } } -impl From for MonitorEvent { - fn from(val: DiscoveryEvent) -> Self { - MonitorEvent::Discovery(val) +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery } } -- cgit v1.2.3