From 5df6812dd2254b871eb773dc626b89646ca87495 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 22 May 2024 18:02:51 +0200 Subject: p2p: monitor system use core::EventSys instead of pubsub pattern --- p2p/src/discovery/lookup.rs | 15 +++++++++++---- p2p/src/discovery/refresh.rs | 18 ++++++++++-------- 2 files changed, 21 insertions(+), 12 deletions(-) (limited to 'p2p/src/discovery') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index cff4610..613c3cd 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -134,7 +134,7 @@ impl LookupService { pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option) -> Result<()> { trace!("Lookup started {endpoint}"); self.monitor - .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) + .notify(DiscoveryEvent::LookupStarted(endpoint.clone())) .await; let mut random_peers = vec![]; @@ -143,7 +143,7 @@ impl LookupService { .await { self.monitor - .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) + .notify(DiscoveryEvent::LookupFailed(endpoint.clone())) .await; return Err(err); }; @@ -166,7 +166,10 @@ impl LookupService { drop(table); self.monitor - .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) + .notify(DiscoveryEvent::LookupSucceeded( + endpoint.clone(), + peer_buffer.len(), + )) .await; Ok(()) @@ -230,10 +233,14 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result> { let conn = self.connector.connect(&endpoint, &peer_id).await?; + self.monitor + .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone()))) + .await; + let result = self.handle_outbound(conn, target_peer_id).await; self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) .await; self.outbound_slots.remove().await; diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 0c49ac2..430e749 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -119,9 +119,7 @@ impl RefreshService { sleep(Duration::from_secs(self.config.refresh_interval)).await; trace!("Start refreshing the routing table..."); - self.monitor - .notify(&DiscoveryEvent::RefreshStarted.into()) - .await; + self.monitor.notify(DiscoveryEvent::RefreshStarted).await; let mut entries: Vec = vec![]; for bucket in self.table.lock().await.iter() { @@ -213,13 +211,15 @@ impl RefreshService { let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone()))) .await; c } Err(err) => { self.monitor - .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed( + endpoint.clone(), + ))) .await; return Err(err.into()); } @@ -230,7 +230,9 @@ impl RefreshService { let res = self.listen_to_ping_msg(&conn).await; if let Err(err) = res { trace!("Failed to handle ping msg {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + self.monitor + .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed)) + .await; } } } @@ -239,7 +241,7 @@ impl RefreshService { async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { let (msg, endpoint) = conn.recv().await?; self.monitor - .notify(&ConnEvent::Accepted(endpoint.clone()).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone()))) .await; match msg { @@ -251,7 +253,7 @@ impl RefreshService { } self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) + .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint))) .await; Ok(()) } -- cgit v1.2.3