From 5df6812dd2254b871eb773dc626b89646ca87495 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 22 May 2024 18:02:51 +0200 Subject: p2p: monitor system use core::EventSys instead of pubsub pattern --- p2p/src/backend.rs | 10 ++--- p2p/src/connector.rs | 8 ++-- p2p/src/discovery/lookup.rs | 15 +++++-- p2p/src/discovery/refresh.rs | 18 ++++---- p2p/src/listener.rs | 14 +++--- p2p/src/monitor.rs | 102 +++++++++++++++++++++++++++---------------- p2p/src/peer_pool.rs | 4 +- 7 files changed, 102 insertions(+), 69 deletions(-) (limited to 'p2p/src') diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 2f21b3e..c05a693 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,13 +2,13 @@ use std::sync::Arc; use log::info; -use karyon_core::{async_runtime::Executor, crypto::KeyPair, pubsub::Subscription}; +use karyon_core::{async_runtime::Executor, crypto::KeyPair}; use crate::{ config::Config, connection::ConnQueue, discovery::{ArcDiscovery, Discovery}, - monitor::{Monitor, MonitorEvent}, + monitor::Monitor, peer_pool::PeerPool, protocol::{ArcProtocol, Protocol}, ArcPeer, PeerID, Result, @@ -112,9 +112,9 @@ impl Backend { self.discovery.outbound_slots.load() } - /// Subscribes to the monitor to receive network events. - pub async fn monitor(&self) -> Subscription { - self.monitor.subscribe().await + /// Returns the monitor to receive system events. + pub fn monitor(&self) -> Arc { + self.monitor.clone() } /// Shuts down the Backend. diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index aea21ab..a44daea 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -87,7 +87,7 @@ impl Connector { match self.dial(endpoint, peer_id).await { Ok(conn) => { self.monitor - .notify(&ConnEvent::Connected(endpoint.clone()).into()) + .notify(ConnEvent::Connected(endpoint.clone())) .await; return Ok(conn); } @@ -97,7 +97,7 @@ impl Connector { } self.monitor - .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into()) + .notify(ConnEvent::ConnectRetried(endpoint.clone())) .await; backoff.sleep().await; @@ -107,7 +107,7 @@ impl Connector { } self.monitor - .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into()) + .notify(ConnEvent::ConnectFailed(endpoint.clone())) .await; self.connection_slots.remove().await; @@ -135,7 +135,7 @@ impl Connector { } selfc .monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) + .notify(ConnEvent::Disconnected(endpoint.clone())) .await; selfc.connection_slots.remove().await; }; diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index cff4610..613c3cd 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -134,7 +134,7 @@ impl LookupService { pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option) -> Result<()> { trace!("Lookup started {endpoint}"); self.monitor - .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) + .notify(DiscoveryEvent::LookupStarted(endpoint.clone())) .await; let mut random_peers = vec![]; @@ -143,7 +143,7 @@ impl LookupService { .await { self.monitor - .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) + .notify(DiscoveryEvent::LookupFailed(endpoint.clone())) .await; return Err(err); }; @@ -166,7 +166,10 @@ impl LookupService { drop(table); self.monitor - .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) + .notify(DiscoveryEvent::LookupSucceeded( + endpoint.clone(), + peer_buffer.len(), + )) .await; Ok(()) @@ -230,10 +233,14 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result> { let conn = self.connector.connect(&endpoint, &peer_id).await?; + self.monitor + .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone()))) + .await; + let result = self.handle_outbound(conn, target_peer_id).await; self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) .await; self.outbound_slots.remove().await; diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 0c49ac2..430e749 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -119,9 +119,7 @@ impl RefreshService { sleep(Duration::from_secs(self.config.refresh_interval)).await; trace!("Start refreshing the routing table..."); - self.monitor - .notify(&DiscoveryEvent::RefreshStarted.into()) - .await; + self.monitor.notify(DiscoveryEvent::RefreshStarted).await; let mut entries: Vec = vec![]; for bucket in self.table.lock().await.iter() { @@ -213,13 +211,15 @@ impl RefreshService { let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone()))) .await; c } Err(err) => { self.monitor - .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed( + endpoint.clone(), + ))) .await; return Err(err.into()); } @@ -230,7 +230,9 @@ impl RefreshService { let res = self.listen_to_ping_msg(&conn).await; if let Err(err) = res { trace!("Failed to handle ping msg {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + self.monitor + .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed)) + .await; } } } @@ -239,7 +241,7 @@ impl RefreshService { async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { let (msg, endpoint) = conn.recv().await?; self.monitor - .notify(&ConnEvent::Accepted(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone()))) .await; match msg { @@ -251,7 +253,7 @@ impl RefreshService { } self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) .await; Ok(()) } diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 1abf79a..923eb18 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -72,15 +72,13 @@ impl Listener { let listener = match self.listen(&endpoint).await { Ok(listener) => { self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) + .notify(ConnEvent::Listening(endpoint.clone())) .await; listener } Err(err) => { error!("Failed to listen on {endpoint}: {err}"); - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint).into()) - .await; + self.monitor.notify(ConnEvent::ListenFailed(endpoint)).await; return Err(err); } }; @@ -117,20 +115,20 @@ impl Listener { let endpoint = match c.peer_endpoint() { Ok(ep) => ep, Err(err) => { - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + self.monitor.notify(ConnEvent::AcceptFailed).await; error!("Failed to accept a new connection: {err}"); continue; } }; self.monitor - .notify(&ConnEvent::Accepted(endpoint.clone()).into()) + .notify(ConnEvent::Accepted(endpoint.clone())) .await; (c, endpoint) } Err(err) => { error!("Failed to accept a new connection: {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + self.monitor.notify(ConnEvent::AcceptFailed).await; continue; } }; @@ -144,7 +142,7 @@ impl Listener { } selfc .monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) + .notify(ConnEvent::Disconnected(endpoint)) .await; selfc.connection_slots.remove().await; }; diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index cbbd40c..bc3ea7f 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -2,7 +2,7 @@ use std::fmt; use crate::PeerID; -use karyon_core::pubsub::{ArcPublisher, Publisher, Subscription}; +use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; use karyon_net::Endpoint; @@ -28,40 +28,54 @@ use karyon_net::Endpoint; /// let backend = Backend::new(&key_pair, Config::default(), ex.into()); /// /// // Create a new Subscription -/// let sub = backend.monitor().await; +/// let monitor = backend.monitor(); /// -/// let event = sub.recv().await; +/// let listener = monitor.conn_events().await; +/// +/// let new_event = listener.recv().await; /// }; /// ``` pub struct Monitor { - inner: ArcPublisher, + event_sys: ArcEventSys, } impl Monitor { /// Creates a new Monitor - pub(crate) fn new() -> Monitor { + pub(crate) fn new() -> Self { Self { - inner: Publisher::new(), + event_sys: EventSys::new(), } } - /// Sends a new monitor event to all subscribers. - pub async fn notify(&self, event: &MonitorEvent) { - self.inner.notify(event).await; + /// Sends a new monitor event to subscribers. + pub(crate) async fn notify(&self, event: E) + where + E: EventValue + Clone + EventValueTopic, + { + self.event_sys.emit(&event).await + } + + /// Registers a new event listener for connection events. + pub async fn conn_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::Conn).await } - /// Subscribes to listen to new events. - pub async fn subscribe(&self) -> Subscription { - self.inner.subscribe().await + /// Registers a new event listener for peer pool events. + pub async fn peer_pool_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::PeerPool).await + } + + /// Registers a new event listener for discovery events. + pub async fn discovery_events(&self) -> EventListener { + self.event_sys.register(&MonitorTopic::Discovery).await } } -/// Defines various type of event that can be monitored. -#[derive(Clone, Debug)] -pub enum MonitorEvent { - Conn(ConnEvent), - PeerPool(PeerPoolEvent), - Discovery(DiscoveryEvent), +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub enum MonitorTopic { + Conn, + PeerPool, + Discovery, } /// Defines connection-related events. @@ -88,22 +102,12 @@ pub enum PeerPoolEvent { #[derive(Clone, Debug)] pub enum DiscoveryEvent { LookupStarted(Endpoint), + Conn(ConnEvent), LookupFailed(Endpoint), LookupSucceeded(Endpoint, usize), RefreshStarted, } -impl fmt::Display for MonitorEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - MonitorEvent::Conn(e) => format!("Connection Event: {e}"), - MonitorEvent::PeerPool(e) => format!("PeerPool Event: {e}"), - MonitorEvent::Discovery(e) => format!("Discovery Event: {e}"), - }; - write!(f, "{}", val) - } -} - impl fmt::Display for ConnEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { @@ -134,6 +138,7 @@ impl fmt::Display for DiscoveryEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let val = match self { DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), + DiscoveryEvent::Conn(event) => format!("Connection event: {event}"), DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), DiscoveryEvent::LookupSucceeded(endpoint, len) => { format!("LookupSucceeded: {endpoint} {len}") @@ -144,20 +149,41 @@ impl fmt::Display for DiscoveryEvent { } } -impl From for MonitorEvent { - fn from(val: ConnEvent) -> Self { - MonitorEvent::Conn(val) +impl EventValue for ConnEvent { + fn id() -> &'static str { + "ConnEvent" + } +} + +impl EventValue for PeerPoolEvent { + fn id() -> &'static str { + "PeerPoolEvent" + } +} + +impl EventValue for DiscoveryEvent { + fn id() -> &'static str { + "DiscoveryEvent" + } +} + +impl EventValueTopic for ConnEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Conn } } -impl From for MonitorEvent { - fn from(val: PeerPoolEvent) -> Self { - MonitorEvent::PeerPool(val) +impl EventValueTopic for PeerPoolEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::PeerPool } } -impl From for MonitorEvent { - fn from(val: DiscoveryEvent) -> Self { - MonitorEvent::Discovery(val) +impl EventValueTopic for DiscoveryEvent { + type Topic = MonitorTopic; + fn topic() -> Self::Topic { + MonitorTopic::Discovery } } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 8b16ef5..07bb73d 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -193,7 +193,7 @@ impl PeerPool { info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); self.monitor - .notify(&PeerPoolEvent::NewPeer(pid.clone()).into()) + .notify(PeerPoolEvent::NewPeer(pid.clone())) .await; Ok(()) @@ -216,7 +216,7 @@ impl PeerPool { peer.shutdown().await; self.monitor - .notify(&PeerPoolEvent::RemovePeer(pid.clone()).into()) + .notify(PeerPoolEvent::RemovePeer(pid.clone())) .await; let endpoint = peer.remote_endpoint(); -- cgit v1.2.3