aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/monitor
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/monitor')
-rw-r--r--p2p/src/monitor/event.rs95
-rw-r--r--p2p/src/monitor/mod.rs209
2 files changed, 304 insertions, 0 deletions
diff --git a/p2p/src/monitor/event.rs b/p2p/src/monitor/event.rs
new file mode 100644
index 0000000..d320e37
--- /dev/null
+++ b/p2p/src/monitor/event.rs
@@ -0,0 +1,95 @@
+use karyon_net::Endpoint;
+
+use crate::PeerID;
+
+/// Defines connection-related events.
+#[derive(Clone, Debug)]
+pub enum ConnEvent {
+ Connected(Endpoint),
+ ConnectRetried(Endpoint),
+ ConnectFailed(Endpoint),
+ Accepted(Endpoint),
+ AcceptFailed,
+ Disconnected(Endpoint),
+ Listening(Endpoint),
+ ListenFailed(Endpoint),
+}
+
+/// Defines `PP` events.
+#[derive(Clone, Debug)]
+pub enum PPEvent {
+ NewPeer(PeerID),
+ RemovePeer(PeerID),
+}
+
+/// Defines `Discovery` events.
+#[derive(Clone, Debug)]
+pub enum DiscvEvent {
+ LookupStarted(Endpoint),
+ LookupFailed(Endpoint),
+ LookupSucceeded(Endpoint, usize),
+ RefreshStarted,
+}
+
+impl ConnEvent {
+ pub(super) fn get_endpoint(&self) -> Option<&Endpoint> {
+ match self {
+ ConnEvent::Connected(endpoint)
+ | ConnEvent::ConnectRetried(endpoint)
+ | ConnEvent::ConnectFailed(endpoint)
+ | ConnEvent::Accepted(endpoint)
+ | ConnEvent::Disconnected(endpoint)
+ | ConnEvent::Listening(endpoint)
+ | ConnEvent::ListenFailed(endpoint) => Some(endpoint),
+ ConnEvent::AcceptFailed => None,
+ }
+ }
+
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ ConnEvent::Connected(_) => "Connected",
+ ConnEvent::ConnectRetried(_) => "ConnectRetried",
+ ConnEvent::ConnectFailed(_) => "ConnectFailed",
+ ConnEvent::Accepted(_) => "Accepted",
+ ConnEvent::AcceptFailed => "AcceptFailed",
+ ConnEvent::Disconnected(_) => "Disconnected",
+ ConnEvent::Listening(_) => "Listening",
+ ConnEvent::ListenFailed(_) => "ListenFailed",
+ }
+ }
+}
+
+impl PPEvent {
+ pub(super) fn get_peer_id(&self) -> Option<&PeerID> {
+ match self {
+ PPEvent::NewPeer(peer_id) | PPEvent::RemovePeer(peer_id) => Some(peer_id),
+ }
+ }
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ PPEvent::NewPeer(_) => "NewPeer",
+ PPEvent::RemovePeer(_) => "RemovePeer",
+ }
+ }
+}
+
+impl DiscvEvent {
+ pub(super) fn get_endpoint_and_size(&self) -> (Option<&Endpoint>, Option<usize>) {
+ match self {
+ DiscvEvent::LookupStarted(endpoint) | DiscvEvent::LookupFailed(endpoint) => {
+ (Some(endpoint), None)
+ }
+ DiscvEvent::LookupSucceeded(endpoint, size) => (Some(endpoint), Some(*size)),
+ DiscvEvent::RefreshStarted => (None, None),
+ }
+ }
+
+ pub(super) fn variant_name(&self) -> &'static str {
+ match self {
+ DiscvEvent::LookupStarted(_) => "LookupStarted",
+ DiscvEvent::LookupFailed(_) => "LookupFailed",
+ DiscvEvent::LookupSucceeded(_, _) => "LookupSucceeded",
+ DiscvEvent::RefreshStarted => "RefreshStarted",
+ }
+ }
+}
diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs
new file mode 100644
index 0000000..34d252e
--- /dev/null
+++ b/p2p/src/monitor/mod.rs
@@ -0,0 +1,209 @@
+mod event;
+
+use std::sync::Arc;
+
+use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
+
+use karyon_net::Endpoint;
+
+pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
+
+#[cfg(feature = "serde")]
+use serde::{Deserialize, Serialize};
+
+use crate::{Config, PeerID};
+
+/// Responsible for network and system monitoring.
+///
+/// It use pub-sub pattern to notify the subscribers with new events.
+///
+/// # Example
+///
+/// ```
+/// use std::sync::Arc;
+///
+/// use smol::Executor;
+///
+/// use karyon_p2p::{
+/// Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent,
+/// };
+///
+/// async {
+///
+/// // Create a new Executor
+/// let ex = Arc::new(Executor::new());
+///
+/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
+/// let backend = Backend::new(&key_pair, Config::default(), ex.into());
+///
+/// // Create a new Subscription
+/// let monitor = backend.monitor();
+///
+/// let listener = monitor.register::<ConnectionEvent>().await;
+///
+/// let new_event = listener.recv().await;
+/// };
+/// ```
+pub struct Monitor {
+ event_sys: ArcEventSys<MonitorTopic>,
+ config: Arc<Config>,
+}
+
+impl Monitor {
+ /// Creates a new Monitor
+ pub(crate) fn new(config: Arc<Config>) -> Self {
+ Self {
+ event_sys: EventSys::new(),
+ config,
+ }
+ }
+
+ /// Sends a new monitor event to subscribers.
+ pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
+ if self.config.enable_monitor {
+ let event = event.to_struct();
+ self.event_sys.emit(&event).await
+ }
+ }
+
+ /// Registers a new event listener for the provided topic.
+ pub async fn register<E>(&self) -> EventListener<MonitorTopic, E>
+ where
+ E: Clone + EventValue + EventValueTopic<Topic = MonitorTopic>,
+ {
+ self.event_sys.register(&E::topic()).await
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub enum MonitorTopic {
+ Connection,
+ PeerPool,
+ Discovery,
+}
+
+pub(super) trait ToEventStruct: Sized {
+ type EventStruct: From<Self> + Clone + EventValueTopic<Topic = MonitorTopic> + EventValue;
+ fn to_struct(self) -> Self::EventStruct {
+ self.into()
+ }
+}
+
+impl ToEventStruct for ConnEvent {
+ type EventStruct = ConnectionEvent;
+}
+
+impl ToEventStruct for PPEvent {
+ type EventStruct = PeerPoolEvent;
+}
+
+impl ToEventStruct for DiscvEvent {
+ type EventStruct = DiscoveryEvent;
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct ConnectionEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub endpoint: Option<Endpoint>,
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct PeerPoolEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub peer_id: Option<PeerID>,
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct DiscoveryEvent {
+ pub event: String,
+ pub date: i64,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub endpoint: Option<Endpoint>,
+ #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
+ pub size: Option<usize>,
+}
+
+impl From<ConnEvent> for ConnectionEvent {
+ fn from(event: ConnEvent) -> Self {
+ let endpoint = event.get_endpoint().cloned();
+ Self {
+ endpoint,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl From<PPEvent> for PeerPoolEvent {
+ fn from(event: PPEvent) -> Self {
+ let peer_id = event.get_peer_id().cloned();
+ Self {
+ peer_id,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl From<DiscvEvent> for DiscoveryEvent {
+ fn from(event: DiscvEvent) -> Self {
+ let (endpoint, size) = event.get_endpoint_and_size();
+ Self {
+ endpoint: endpoint.cloned(),
+ size,
+ event: event.variant_name().to_string(),
+ date: get_current_timestamp(),
+ }
+ }
+}
+
+impl EventValue for ConnectionEvent {
+ fn id() -> &'static str {
+ "ConnectionEvent"
+ }
+}
+
+impl EventValue for PeerPoolEvent {
+ fn id() -> &'static str {
+ "PeerPoolEvent"
+ }
+}
+
+impl EventValue for DiscoveryEvent {
+ fn id() -> &'static str {
+ "DiscoveryEvent"
+ }
+}
+
+impl EventValueTopic for ConnectionEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Connection
+ }
+}
+
+impl EventValueTopic for PeerPoolEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::PeerPool
+ }
+}
+
+impl EventValueTopic for DiscoveryEvent {
+ type Topic = MonitorTopic;
+ fn topic() -> Self::Topic {
+ MonitorTopic::Discovery
+ }
+}
+
+fn get_current_timestamp() -> i64 {
+ chrono::Utc::now().timestamp()
+}