aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-24 13:25:39 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-24 13:25:39 +0200
commite3d1f4fd91a5f077fda8a1976e194c378ee166d0 (patch)
treec6d0c3d8777705202a88f838675eee85cd7e941d /p2p
parent135968d8f1379a6d2f32cbbc3e5b77a5f317a4d6 (diff)
p2p/monitor: use struct instead of enum for monitor events
Diffstat (limited to 'p2p')
-rw-r--r--p2p/src/discovery/lookup.rs8
-rw-r--r--p2p/src/discovery/refresh.rs4
-rw-r--r--p2p/src/monitor.rs198
-rw-r--r--p2p/src/monitor/event.rs95
-rw-r--r--p2p/src/monitor/mod.rs209
-rw-r--r--p2p/src/peer_pool.rs10
6 files changed, 313 insertions, 211 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 677bad7..a941986 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -20,7 +20,7 @@ use crate::{
get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg,
ShutdownMsg,
},
- monitor::{ConnEvent, DiscoveryEvent, Monitor},
+ monitor::{ConnEvent, DiscvEvent, Monitor},
routing_table::RoutingTable,
slots::ConnectionSlots,
version::version_match,
@@ -131,7 +131,7 @@ impl LookupService {
pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
trace!("Lookup started {endpoint}");
self.monitor
- .notify(DiscoveryEvent::LookupStarted(endpoint.clone()))
+ .notify(DiscvEvent::LookupStarted(endpoint.clone()))
.await;
let mut random_peers = vec![];
@@ -140,7 +140,7 @@ impl LookupService {
.await
{
self.monitor
- .notify(DiscoveryEvent::LookupFailed(endpoint.clone()))
+ .notify(DiscvEvent::LookupFailed(endpoint.clone()))
.await;
return Err(err);
};
@@ -161,7 +161,7 @@ impl LookupService {
}
self.monitor
- .notify(DiscoveryEvent::LookupSucceeded(
+ .notify(DiscvEvent::LookupSucceeded(
endpoint.clone(),
peer_buffer.len(),
))
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index eec6743..c1d222b 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -14,7 +14,7 @@ use karyon_net::{udp, Connection, Endpoint, Error as NetError};
use crate::{
codec::RefreshMsgCodec,
message::RefreshMsg,
- monitor::{ConnEvent, DiscoveryEvent, Monitor},
+ monitor::{ConnEvent, DiscvEvent, Monitor},
routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY},
Config, Error, Result,
};
@@ -116,7 +116,7 @@ impl RefreshService {
sleep(Duration::from_secs(self.config.refresh_interval)).await;
trace!("Start refreshing the routing table...");
- self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
+ self.monitor.notify(DiscvEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
for bucket in self.table.buckets() {
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs
deleted file mode 100644
index 4d6a46c..0000000
--- a/p2p/src/monitor.rs
+++ /dev/null
@@ -1,198 +0,0 @@
-use std::{fmt, sync::Arc};
-
-use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
-
-use karyon_net::Endpoint;
-
-#[cfg(feature = "serde")]
-use serde::{Deserialize, Serialize};
-
-use crate::{Config, PeerID};
-
-/// Responsible for network and system monitoring.
-///
-/// It use pub-sub pattern to notify the subscribers with new events.
-///
-/// # Example
-///
-/// ```
-/// use std::sync::Arc;
-///
-/// use smol::Executor;
-///
-/// use karyon_p2p::{Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}};
-///
-/// async {
-///
-/// // Create a new Executor
-/// let ex = Arc::new(Executor::new());
-///
-/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
-/// let backend = Backend::new(&key_pair, Config::default(), ex.into());
-///
-/// // Create a new Subscription
-/// let monitor = backend.monitor();
-///
-/// let listener = monitor.conn_events().await;
-///
-/// let new_event = listener.recv().await;
-/// };
-/// ```
-pub struct Monitor {
- event_sys: ArcEventSys<MonitorTopic>,
- config: Arc<Config>,
-}
-
-impl Monitor {
- /// Creates a new Monitor
- pub(crate) fn new(config: Arc<Config>) -> Self {
- Self {
- event_sys: EventSys::new(),
- config,
- }
- }
-
- /// Sends a new monitor event to subscribers.
- pub(crate) async fn notify<E>(&self, event: E)
- where
- E: EventValue + Clone + EventValueTopic<Topic = MonitorTopic>,
- {
- if self.config.enable_monitor {
- self.event_sys.emit(&event).await
- }
- }
-
- /// Registers a new event listener for connection events.
- pub async fn conn_events(&self) -> EventListener<MonitorTopic, ConnEvent> {
- self.event_sys.register(&MonitorTopic::Conn).await
- }
-
- /// Registers a new event listener for peer pool events.
- pub async fn peer_pool_events(&self) -> EventListener<MonitorTopic, PeerPoolEvent> {
- self.event_sys.register(&MonitorTopic::PeerPool).await
- }
-
- /// Registers a new event listener for discovery events.
- pub async fn discovery_events(&self) -> EventListener<MonitorTopic, DiscoveryEvent> {
- self.event_sys.register(&MonitorTopic::Discovery).await
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, Hash)]
-#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
-pub enum MonitorTopic {
- Conn,
- PeerPool,
- Discovery,
-}
-
-/// Defines connection-related events.
-#[derive(Clone, Debug)]
-#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
-pub enum ConnEvent {
- Connected(Endpoint),
- ConnectRetried(Endpoint),
- ConnectFailed(Endpoint),
- Accepted(Endpoint),
- AcceptFailed,
- Disconnected(Endpoint),
- Listening(Endpoint),
- ListenFailed(Endpoint),
-}
-
-/// Defines `PeerPool` events.
-#[derive(Clone, Debug)]
-#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
-pub enum PeerPoolEvent {
- NewPeer(PeerID),
- RemovePeer(PeerID),
-}
-
-/// Defines `Discovery` events.
-#[derive(Clone, Debug)]
-#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
-pub enum DiscoveryEvent {
- LookupStarted(Endpoint),
- LookupFailed(Endpoint),
- LookupSucceeded(Endpoint, usize),
- RefreshStarted,
-}
-
-impl fmt::Display for ConnEvent {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let val = match self {
- ConnEvent::Connected(endpoint) => format!("Connected: {endpoint}"),
- ConnEvent::ConnectFailed(endpoint) => format!("ConnectFailed: {endpoint}"),
- ConnEvent::ConnectRetried(endpoint) => format!("ConnectRetried: {endpoint}"),
- ConnEvent::AcceptFailed => "AcceptFailed".to_string(),
- ConnEvent::Accepted(endpoint) => format!("Accepted: {endpoint}"),
- ConnEvent::Disconnected(endpoint) => format!("Disconnected: {endpoint}"),
- ConnEvent::Listening(endpoint) => format!("Listening: {endpoint}"),
- ConnEvent::ListenFailed(endpoint) => format!("ListenFailed: {endpoint}"),
- };
- write!(f, "{}", val)
- }
-}
-
-impl fmt::Display for PeerPoolEvent {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let val = match self {
- PeerPoolEvent::NewPeer(pid) => format!("NewPeer: {pid}"),
- PeerPoolEvent::RemovePeer(pid) => format!("RemovePeer: {pid}"),
- };
- write!(f, "{}", val)
- }
-}
-
-impl fmt::Display for DiscoveryEvent {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let val = match self {
- DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"),
- DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"),
- DiscoveryEvent::LookupSucceeded(endpoint, len) => {
- format!("LookupSucceeded: {endpoint} {len}")
- }
- DiscoveryEvent::RefreshStarted => "RefreshStarted".to_string(),
- };
- write!(f, "{}", val)
- }
-}
-
-impl EventValue for ConnEvent {
- fn id() -> &'static str {
- "ConnEvent"
- }
-}
-
-impl EventValue for PeerPoolEvent {
- fn id() -> &'static str {
- "PeerPoolEvent"
- }
-}
-
-impl EventValue for DiscoveryEvent {
- fn id() -> &'static str {
- "DiscoveryEvent"
- }
-}
-
-impl EventValueTopic for ConnEvent {
- type Topic = MonitorTopic;
- fn topic() -> Self::Topic {
- MonitorTopic::Conn
- }
-}
-
-impl EventValueTopic for PeerPoolEvent {
- type Topic = MonitorTopic;
- fn topic() -> Self::Topic {
- MonitorTopic::PeerPool
- }
-}
-
-impl EventValueTopic for DiscoveryEvent {
- type Topic = MonitorTopic;
- fn topic() -> Self::Topic {
- MonitorTopic::Discovery
- }
-}
diff --git a/p2p/src/monitor/event.rs b/p2p/src/monitor/event.rs
new file mode 100644
index 0000000..d320e37
--- /dev/null
+++ b/p2p/src/monitor/event.rs
@@ -0,0 +1,95 @@
+use karyon_net::Endpoint;
+
+use crate::PeerID;
+
+/// Defines connection-related events.
+#[derive(Clone, Debug)]
+pub enum ConnEvent {
+ Connected(Endpoint),
+ ConnectRetried(Endpoint),
+ ConnectFailed(Endpoint),
+ Accepted(Endpoint),
+ AcceptFailed,
+ Disconnected(Endpoint),
+ Listening(Endpoint),
+ ListenFailed(Endpoint),
+}
+
+/// Defines `PP` events.
+#[derive(Clone, Debug)]
+pub enum PPEvent {
+ NewPeer(PeerID),
+ RemovePeer(PeerID),
+}
+
+/// Defines `Discovery` events.
+#[derive(Clone, Debug)]
+pub enum DiscvEvent {
+ LookupStarted(Endpoint),
+ LookupFailed(Endpoint),
+ LookupSucceeded(Endpoint, usize),
+ RefreshStarted,
+}
+
+impl ConnEvent {
+ pub(super) fn get_endpoint(&self) -> Option<&Endpoint> {
+ match self {
+ ConnEvent::Connected(endpoint)
+ | ConnEvent::ConnectRetried(endpoint)
+ | ConnEvent::ConnectFailed(endpoint)
+ | ConnEvent::Accepted(endpoint)
+ | ConnEvent::Disconnected(endpoint)
+ | ConnEvent::Listening(endpoint)
+ | ConnEvent::ListenFailed(endpoint) => Some(endpoint),
+ ConnEvent::AcceptFailed => None,
+ }
+ }
+
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ ConnEvent::Connected(_) => "Connected",
+ ConnEvent::ConnectRetried(_) => "ConnectRetried",
+ ConnEvent::ConnectFailed(_) => "ConnectFailed",
+ ConnEvent::Accepted(_) => "Accepted",
+ ConnEvent::AcceptFailed => "AcceptFailed",
+ ConnEvent::Disconnected(_) => "Disconnected",
+ ConnEvent::Listening(_) => "Listening",
+ ConnEvent::ListenFailed(_) => "ListenFailed",
+ }
+ }
+}
+
+impl PPEvent {
+ pub(super) fn get_peer_id(&self) -> Option<&PeerID> {
+ match self {
+ PPEvent::NewPeer(peer_id) | PPEvent::RemovePeer(peer_id) => Some(peer_id),
+ }
+ }
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ PPEvent::NewPeer(_) => "NewPeer",
+ PPEvent::RemovePeer(_) => "RemovePeer",
+ }
+ }
+}
+
+impl DiscvEvent {
+ pub(super) fn get_endpoint_and_size(&self) -> (Option<&Endpoint>, Option<usize>) {
+ match self {
+ DiscvEvent::LookupStarted(endpoint) | DiscvEvent::LookupFailed(endpoint) => {
+ (Some(endpoint), None)
+ }
+ DiscvEvent::LookupSucceeded(endpoint, size) => (Some(endpoint), Some(*size)),
+ DiscvEvent::RefreshStarted => (None, None),
+ }
+ }
+
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ DiscvEvent::LookupStarted(_) => "LookupStarted",
+ DiscvEvent::LookupFailed(_) => "LookupFailed",
+ DiscvEvent::LookupSucceeded(_, _) => "LookupSucceeded",
+ DiscvEvent::RefreshStarted => "RefreshStarted",
+ }
+ }
+}
diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs
new file mode 100644
index 0000000..34d252e
--- /dev/null
+++ b/p2p/src/monitor/mod.rs
@@ -0,0 +1,209 @@
+mod event;
+
+use std::sync::Arc;
+
+use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
+
+use karyon_net::Endpoint;
+
+pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
+
+#[cfg(feature = "serde")]
+use serde::{Deserialize, Serialize};
+
+use crate::{Config, PeerID};
+
+/// Responsible for network and system monitoring.
+///
+/// It use pub-sub pattern to notify the subscribers with new events.
+///
+/// # Example
+///
+/// ```
+/// use std::sync::Arc;
+///
+/// use smol::Executor;
+///
+/// use karyon_p2p::{
+/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent,
+/// };
+///
+/// async {
+///
+/// // Create a new Executor
+/// let ex = Arc::new(Executor::new());
+///
+/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
+/// let backend = Backend::new(&key_pair, Config::default(), ex.into());
+///
+/// // Create a new Subscription
+/// let monitor = backend.monitor();
+///
+/// let listener = monitor.register::<ConnectionEvent>().await;
+///
+/// let new_event = listener.recv().await;
+/// };
+/// ```
+pub struct Monitor {
+ event_sys: ArcEventSys<MonitorTopic>,
+ config: Arc<Config>,
+}
+
+impl Monitor {
+ /// Creates a new Monitor
+ pub(crate) fn new(config: Arc<Config>) -> Self {
+ Self {
+ event_sys: EventSys::new(),
+ config,
+ }
+ }
+
+ /// Sends a new monitor event to subscribers.
+ pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
+ if self.config.enable_monitor {
+ let event = event.to_struct();
+ self.event_sys.emit(&event).await
+ }
+ }
+
+ /// Registers a new event listener for the provided topic.
+ pub async fn register<E>(&self) -> EventListener<MonitorTopic, E>
+ where
+ E: Clone + EventValue + EventValueTopic<Topic = MonitorTopic>,
+ {
+ self.event_sys.register(&E::topic()).await
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub enum MonitorTopic {
+ Connection,
+ PeerPool,
+ Discovery,
+}
+
+pub(super) trait ToEventStruct: Sized {
+ type EventStruct: From<Self> + Clone + EventValueTopic<Topic = MonitorTopic> + EventValue;
+ fn to_struct(self) -> Self::EventStruct {
+ self.into()
+ }
+}
+
+impl ToEventStruct for ConnEvent {
+ type EventStruct = ConnectionEvent;
+}
+
+impl ToEventStruct for PPEvent {
+ type EventStruct = PeerPoolEvent;
+}
+
+impl ToEventStruct for DiscvEvent {
+ type EventStruct = DiscoveryEvent;
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct ConnectionEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub endpoint: Option<Endpoint>,
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct PeerPoolEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub peer_id: Option<PeerID>,
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct DiscoveryEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub endpoint: Option<Endpoint>,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub size: Option<usize>,
+}
+
+impl From<ConnEvent> for ConnectionEvent {
+ fn from(event: ConnEvent) -> Self {
+ let endpoint = event.get_endpoint().cloned();
+ Self {
+ endpoint,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl From<PPEvent> for PeerPoolEvent {
+ fn from(event: PPEvent) -> Self {
+ let peer_id = event.get_peer_id().cloned();
+ Self {
+ peer_id,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl From<DiscvEvent> for DiscoveryEvent {
+ fn from(event: DiscvEvent) -> Self {
+ let (endpoint, size) = event.get_endpoint_and_size();
+ Self {
+ endpoint: endpoint.cloned(),
+ size,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl EventValue for ConnectionEvent {
+ fn id() -> &'static str {
+ "ConnectionEvent"
+ }
+}
+
+impl EventValue for PeerPoolEvent {
+ fn id() -> &'static str {
+ "PeerPoolEvent"
+ }
+}
+
+impl EventValue for DiscoveryEvent {
+ fn id() -> &'static str {
+ "DiscoveryEvent"
+ }
+}
+
+impl EventValueTopic for ConnectionEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Connection
+ }
+}
+
+impl EventValueTopic for PeerPoolEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::PeerPool
+ }
+}
+
+impl EventValueTopic for DiscoveryEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Discovery
+ }
+}
+
+fn get_current_timestamp() -> i64 {
+ chrono::Utc::now().timestamp()
+}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 79fd4b4..6c895a0 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -23,7 +23,7 @@ use crate::{
config::Config,
conn_queue::{ConnDirection, ConnQueue},
message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
- monitor::{Monitor, PeerPoolEvent},
+ monitor::{Monitor, PPEvent},
peer::{ArcPeer, Peer, PeerID},
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
@@ -192,9 +192,7 @@ impl PeerPool {
info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");
- self.monitor
- .notify(PeerPoolEvent::NewPeer(pid.clone()))
- .await;
+ self.monitor.notify(PPEvent::NewPeer(pid.clone())).await;
Ok(())
}
@@ -215,9 +213,7 @@ impl PeerPool {
peer.shutdown().await;
- self.monitor
- .notify(PeerPoolEvent::RemovePeer(pid.clone()))
- .await;
+ self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await;
let endpoint = peer.remote_endpoint();
let direction = peer.direction();