aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-22 18:02:51 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-22 18:02:51 +0200
commit5df6812dd2254b871eb773dc626b89646ca87495 (patch)
tree18828be9c86aba205d30086f705f2c6e2aeee975 /p2p/src/discovery/refresh.rs
parentc2860d3266aa1233787400b163d527fdc7dafe61 (diff)
p2p: monitor system use core::EventSys instead of pubsub pattern
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs18
1 files changed, 10 insertions, 8 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 0c49ac2..430e749 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -119,9 +119,7 @@ impl RefreshService {
sleep(Duration::from_secs(self.config.refresh_interval)).await;
trace!("Start refreshing the routing table...");
- self.monitor
- .notify(&DiscoveryEvent::RefreshStarted.into())
- .await;
+ self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
for bucket in self.table.lock().await.iter() {
@@ -213,13 +211,15 @@ impl RefreshService {
let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {
Ok(c) => {
self.monitor
- .notify(&ConnEvent::Listening(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone())))
.await;
c
}
Err(err) => {
self.monitor
- .notify(&ConnEvent::ListenFailed(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed(
+ endpoint.clone(),
+ )))
.await;
return Err(err.into());
}
@@ -230,7 +230,9 @@ impl RefreshService {
let res = self.listen_to_ping_msg(&conn).await;
if let Err(err) = res {
trace!("Failed to handle ping msg {err}");
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ self.monitor
+ .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed))
+ .await;
}
}
}
@@ -239,7 +241,7 @@ impl RefreshService {
async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {
let (msg, endpoint) = conn.recv().await?;
self.monitor
- .notify(&ConnEvent::Accepted(endpoint.clone()).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone())))
.await;
match msg {
@@ -251,7 +253,7 @@ impl RefreshService {
}
self.monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
+ .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))
.await;
Ok(())
}