diff options
author | hozan23 <hozan23@karyontech.net> | 2024-05-22 18:02:51 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-05-22 18:02:51 +0200 |
commit | 5df6812dd2254b871eb773dc626b89646ca87495 (patch) | |
tree | 18828be9c86aba205d30086f705f2c6e2aeee975 /p2p/src/monitor.rs | |
parent | c2860d3266aa1233787400b163d527fdc7dafe61 (diff) |
p2p: monitor system use core::EventSys instead of pubsub pattern
Diffstat (limited to 'p2p/src/monitor.rs')
-rw-r--r-- | p2p/src/monitor.rs | 102 |
1 files changed, 64 insertions, 38 deletions
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index cbbd40c..bc3ea7f 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -2,7 +2,7 @@ use std::fmt; use crate::PeerID; -use karyon_core::pubsub::{ArcPublisher, Publisher, Subscription}; +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; use karyon_net::Endpoint; @@ -28,40 +28,54 @@ use karyon_net::Endpoint; /// let backend = Backend::new(&key_pair, Config::default(), ex.into()); /// /// // Create a new Subscription -/// let sub = backend.monitor().await; +/// let monitor = backend.monitor(); /// -/// let event = sub.recv().await; +/// let listener = monitor.conn_events().await; +/// +/// let new_event = listener.recv().await; /// }; /// ``` pub struct Monitor { - inner: ArcPublisher<MonitorEvent>, + event_sys: ArcEventSys<MonitorTopic>, } impl Monitor { /// Creates a new Monitor - pub(crate) fn new() -> Monitor { + pub(crate) fn new() -> Self { Self { - inner: Publisher::new(), + event_sys: EventSys::new(), } } - /// Sends a new monitor event to all subscribers. - pub async fn notify(&self, event: &MonitorEvent) { - self.inner.notify(event).await; + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify<E>(&self, event: E) + where + E: EventValue + Clone + EventValueTopic<Topic = MonitorTopic>, + { + self.event_sys.emit(&event).await + } + + /// Registers a new event listener for connection events. + pub async fn conn_events(&self) -> EventListener<MonitorTopic, ConnEvent> { + self.event_sys.register(&MonitorTopic::Conn).await } - /// Subscribes to listen to new events. - pub async fn subscribe(&self) -> Subscription<MonitorEvent> { - self.inner.subscribe().await + /// Registers a new event listener for peer pool events. + pub async fn peer_pool_events(&self) -> EventListener<MonitorTopic, PeerPoolEvent> { + self.event_sys.register(&MonitorTopic::PeerPool).await + } + + /// Registers a new event listener for discovery events. + pub async fn discovery_events(&self) -> EventListener<MonitorTopic, DiscoveryEvent> { + self.event_sys.register(&MonitorTopic::Discovery).await } } -/// Defines various type of event that can be monitored. -#[derive(Clone, Debug)] -pub enum MonitorEvent { - Conn(ConnEvent), - PeerPool(PeerPoolEvent), - Discovery(DiscoveryEvent), +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub enum MonitorTopic { + Conn, + PeerPool, + Discovery, } /// Defines connection-related events. @@ -88,22 +102,12 @@ pub enum PeerPoolEvent { #[derive(Clone, Debug)] pub enum DiscoveryEvent { LookupStarted(Endpoint), + Conn(ConnEvent), LookupFailed(Endpoint), LookupSucceeded(Endpoint, usize), RefreshStarted, } -impl fmt::Display for MonitorEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - MonitorEvent::Conn(e) => format!("Connection Event: {e}"), - MonitorEvent::PeerPool(e) => format!("PeerPool Event: {e}"), - MonitorEvent::Discovery(e) => format!("Discovery Event: {e}"), - }; - write!(f, "{}", val) - } -} - impl fmt::Display for ConnEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { @@ -134,6 +138,7 @@ impl fmt::Display for DiscoveryEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), + DiscoveryEvent::Conn(event) => format!("Connection event: {event}"), DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), DiscoveryEvent::LookupSucceeded(endpoint, len) => { format!("LookupSucceeded: {endpoint} {len}") @@ -144,20 +149,41 @@ impl fmt::Display for DiscoveryEvent { } } -impl From<ConnEvent> for MonitorEvent { - fn from(val: ConnEvent) -> Self { - MonitorEvent::Conn(val) +impl EventValue for ConnEvent { + fn id() -> &'static str { + "ConnEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Conn } } -impl From<PeerPoolEvent> for MonitorEvent { - fn from(val: PeerPoolEvent) -> Self { - MonitorEvent::PeerPool(val) +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool } } -impl From<DiscoveryEvent> for MonitorEvent { - fn from(val: DiscoveryEvent) -> Self { - MonitorEvent::Discovery(val) +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery } } |