aboutsummaryrefslogtreecommitdiff
path: root/p2p/src
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src')
-rw-r--r--p2p/src/backend.rs10
-rw-r--r--p2p/src/connector.rs8
-rw-r--r--p2p/src/discovery/lookup.rs15
-rw-r--r--p2p/src/discovery/refresh.rs18
-rw-r--r--p2p/src/listener.rs14
-rw-r--r--p2p/src/monitor.rs102
-rw-r--r--p2p/src/peer_pool.rs4
7 files changed, 102 insertions, 69 deletions
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 2f21b3e..c05a693 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -2,13 +2,13 @@ use std::sync::Arc;
use log::info;
-use karyon_core::{async_runtime::Executor, crypto::KeyPair, pubsub::Subscription};
+use karyon_core::{async_runtime::Executor, crypto::KeyPair};
use crate::{
config::Config,
connection::ConnQueue,
discovery::{ArcDiscovery, Discovery},
- monitor::{Monitor, MonitorEvent},
+ monitor::Monitor,
peer_pool::PeerPool,
protocol::{ArcProtocol, Protocol},
ArcPeer, PeerID, Result,
@@ -112,9 +112,9 @@ impl Backend {
self.discovery.outbound_slots.load()
}
- /// Subscribes to the monitor to receive network events.
- pub async fn monitor(&self) -> Subscription<MonitorEvent> {
- self.monitor.subscribe().await
+ /// Returns the monitor to receive system events.
+ pub fn monitor(&self) -> Arc<Monitor> {
+ self.monitor.clone()
}
/// Shuts down the Backend.
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index aea21ab..a44daea 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -87,7 +87,7 @@ impl Connector {
match self.dial(endpoint, peer_id).await {
Ok(conn) => {
self.monitor
- .notify(&ConnEvent::Connected(endpoint.clone()).into())
+ .notify(ConnEvent::Connected(endpoint.clone()))
.await;
return Ok(conn);
}
@@ -97,7 +97,7 @@ impl Connector {
}
self.monitor
- .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into())
+ .notify(ConnEvent::ConnectRetried(endpoint.clone()))
.await;
backoff.sleep().await;
@@ -107,7 +107,7 @@ impl Connector {
}
self.monitor
- .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into())
+ .notify(ConnEvent::ConnectFailed(endpoint.clone()))
.await;
self.connection_slots.remove().await;
@@ -135,7 +135,7 @@ impl Connector {
}
selfc
.monitor
- .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
+ .notify(ConnEvent::Disconnected(endpoint.clone()))
.await;
selfc.connection_slots.remove().await;
};
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index cff4610..613c3cd 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -134,7 +134,7 @@ impl LookupService {
pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
trace!("Lookup started {endpoint}");
self.monitor
- .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into())
+ .notify(DiscoveryEvent::LookupStarted(endpoint.clone()))
.await;
let mut random_peers = vec![];
@@ -143,7 +143,7 @@ impl LookupService {
.await
{
self.monitor
- .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into())
+ .notify(DiscoveryEvent::LookupFailed(endpoint.clone()))
.await;
return Err(err);
};
@@ -166,7 +166,10 @@ impl LookupService {
drop(table);
self.monitor
- .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into())
+ .notify(DiscoveryEvent::LookupSucceeded(
+ endpoint.clone(),
+ peer_buffer.len(),
+ ))
.await;
Ok(())
@@ -230,10 +233,14 @@ impl LookupService {
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
+ self.monitor
+ .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone())))
+ .await;
+
let result = self.handle_outbound(conn, target_peer_id).await;
self.monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))
.await;
self.outbound_slots.remove().await;
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 0c49ac2..430e749 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -119,9 +119,7 @@ impl RefreshService {
sleep(Duration::from_secs(self.config.refresh_interval)).await;
trace!("Start refreshing the routing table...");
- self.monitor
- .notify(&DiscoveryEvent::RefreshStarted.into())
- .await;
+ self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
for bucket in self.table.lock().await.iter() {
@@ -213,13 +211,15 @@ impl RefreshService {
let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {
Ok(c) => {
self.monitor
- .notify(&ConnEvent::Listening(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone())))
.await;
c
}
Err(err) => {
self.monitor
- .notify(&ConnEvent::ListenFailed(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed(
+ endpoint.clone(),
+ )))
.await;
return Err(err.into());
}
@@ -230,7 +230,9 @@ impl RefreshService {
let res = self.listen_to_ping_msg(&conn).await;
if let Err(err) = res {
trace!("Failed to handle ping msg {err}");
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ self.monitor
+ .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed))
+ .await;
}
}
}
@@ -239,7 +241,7 @@ impl RefreshService {
async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {
let (msg, endpoint) = conn.recv().await?;
self.monitor
- .notify(&ConnEvent::Accepted(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone())))
.await;
match msg {
@@ -251,7 +253,7 @@ impl RefreshService {
}
self.monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))
.await;
Ok(())
}
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 1abf79a..923eb18 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -72,15 +72,13 @@ impl Listener {
let listener = match self.listen(&endpoint).await {
Ok(listener) => {
self.monitor
- .notify(&ConnEvent::Listening(endpoint.clone()).into())
+ .notify(ConnEvent::Listening(endpoint.clone()))
.await;
listener
}
Err(err) => {
error!("Failed to listen on {endpoint}: {err}");
- self.monitor
- .notify(&ConnEvent::ListenFailed(endpoint).into())
- .await;
+ self.monitor.notify(ConnEvent::ListenFailed(endpoint)).await;
return Err(err);
}
};
@@ -117,20 +115,20 @@ impl Listener {
let endpoint = match c.peer_endpoint() {
Ok(ep) => ep,
Err(err) => {
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ self.monitor.notify(ConnEvent::AcceptFailed).await;
error!("Failed to accept a new connection: {err}");
continue;
}
};
self.monitor
- .notify(&ConnEvent::Accepted(endpoint.clone()).into())
+ .notify(ConnEvent::Accepted(endpoint.clone()))
.await;
(c, endpoint)
}
Err(err) => {
error!("Failed to accept a new connection: {err}");
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ self.monitor.notify(ConnEvent::AcceptFailed).await;
continue;
}
};
@@ -144,7 +142,7 @@ impl Listener {
}
selfc
.monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
+ .notify(ConnEvent::Disconnected(endpoint))
.await;
selfc.connection_slots.remove().await;
};
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs
index cbbd40c..bc3ea7f 100644
--- a/p2p/src/monitor.rs
+++ b/p2p/src/monitor.rs
@@ -2,7 +2,7 @@ use std::fmt;
use crate::PeerID;
-use karyon_core::pubsub::{ArcPublisher, Publisher, Subscription};
+use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
use karyon_net::Endpoint;
@@ -28,40 +28,54 @@ use karyon_net::Endpoint;
/// let backend = Backend::new(&key_pair, Config::default(), ex.into());
///
/// // Create a new Subscription
-/// let sub = backend.monitor().await;
+/// let monitor = backend.monitor();
///
-/// let event = sub.recv().await;
+/// let listener = monitor.conn_events().await;
+///
+/// let new_event = listener.recv().await;
/// };
/// ```
pub struct Monitor {
- inner: ArcPublisher<MonitorEvent>,
+ event_sys: ArcEventSys<MonitorTopic>,
}
impl Monitor {
/// Creates a new Monitor
- pub(crate) fn new() -> Monitor {
+ pub(crate) fn new() -> Self {
Self {
- inner: Publisher::new(),
+ event_sys: EventSys::new(),
}
}
- /// Sends a new monitor event to all subscribers.
- pub async fn notify(&self, event: &MonitorEvent) {
- self.inner.notify(event).await;
+ /// Sends a new monitor event to subscribers.
+ pub(crate) async fn notify<E>(&self, event: E)
+ where
+ E: EventValue + Clone + EventValueTopic<Topic = MonitorTopic>,
+ {
+ self.event_sys.emit(&event).await
+ }
+
+ /// Registers a new event listener for connection events.
+ pub async fn conn_events(&self) -> EventListener<MonitorTopic, ConnEvent> {
+ self.event_sys.register(&MonitorTopic::Conn).await
}
- /// Subscribes to listen to new events.
- pub async fn subscribe(&self) -> Subscription<MonitorEvent> {
- self.inner.subscribe().await
+ /// Registers a new event listener for peer pool events.
+ pub async fn peer_pool_events(&self) -> EventListener<MonitorTopic, PeerPoolEvent> {
+ self.event_sys.register(&MonitorTopic::PeerPool).await
+ }
+
+ /// Registers a new event listener for discovery events.
+ pub async fn discovery_events(&self) -> EventListener<MonitorTopic, DiscoveryEvent> {
+ self.event_sys.register(&MonitorTopic::Discovery).await
}
}
-/// Defines various type of event that can be monitored.
-#[derive(Clone, Debug)]
-pub enum MonitorEvent {
- Conn(ConnEvent),
- PeerPool(PeerPoolEvent),
- Discovery(DiscoveryEvent),
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
+pub enum MonitorTopic {
+ Conn,
+ PeerPool,
+ Discovery,
}
/// Defines connection-related events.
@@ -88,22 +102,12 @@ pub enum PeerPoolEvent {
#[derive(Clone, Debug)]
pub enum DiscoveryEvent {
LookupStarted(Endpoint),
+ Conn(ConnEvent),
LookupFailed(Endpoint),
LookupSucceeded(Endpoint, usize),
RefreshStarted,
}
-impl fmt::Display for MonitorEvent {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let val = match self {
- MonitorEvent::Conn(e) => format!("Connection Event: {e}"),
- MonitorEvent::PeerPool(e) => format!("PeerPool Event: {e}"),
- MonitorEvent::Discovery(e) => format!("Discovery Event: {e}"),
- };
- write!(f, "{}", val)
- }
-}
-
impl fmt::Display for ConnEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let val = match self {
@@ -134,6 +138,7 @@ impl fmt::Display for DiscoveryEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let val = match self {
DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"),
+ DiscoveryEvent::Conn(event) => format!("Connection event: {event}"),
DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"),
DiscoveryEvent::LookupSucceeded(endpoint, len) => {
format!("LookupSucceeded: {endpoint} {len}")
@@ -144,20 +149,41 @@ impl fmt::Display for DiscoveryEvent {
}
}
-impl From<ConnEvent> for MonitorEvent {
- fn from(val: ConnEvent) -> Self {
- MonitorEvent::Conn(val)
+impl EventValue for ConnEvent {
+ fn id() -> &'static str {
+ "ConnEvent"
+ }
+}
+
+impl EventValue for PeerPoolEvent {
+ fn id() -> &'static str {
+ "PeerPoolEvent"
+ }
+}
+
+impl EventValue for DiscoveryEvent {
+ fn id() -> &'static str {
+ "DiscoveryEvent"
+ }
+}
+
+impl EventValueTopic for ConnEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Conn
}
}
-impl From<PeerPoolEvent> for MonitorEvent {
- fn from(val: PeerPoolEvent) -> Self {
- MonitorEvent::PeerPool(val)
+impl EventValueTopic for PeerPoolEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::PeerPool
}
}
-impl From<DiscoveryEvent> for MonitorEvent {
- fn from(val: DiscoveryEvent) -> Self {
- MonitorEvent::Discovery(val)
+impl EventValueTopic for DiscoveryEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Discovery
}
}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 8b16ef5..07bb73d 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -193,7 +193,7 @@ impl PeerPool {
info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");
self.monitor
- .notify(&PeerPoolEvent::NewPeer(pid.clone()).into())
+ .notify(PeerPoolEvent::NewPeer(pid.clone()))
.await;
Ok(())
@@ -216,7 +216,7 @@ impl PeerPool {
peer.shutdown().await;
self.monitor
- .notify(&PeerPoolEvent::RemovePeer(pid.clone()).into())
+ .notify(PeerPoolEvent::RemovePeer(pid.clone()))
.await;
let endpoint = peer.remote_endpoint();