diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-05-22 18:02:51 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-05-22 18:02:51 +0200 | 
| commit | 5df6812dd2254b871eb773dc626b89646ca87495 (patch) | |
| tree | 18828be9c86aba205d30086f705f2c6e2aeee975 /p2p/src/discovery | |
| parent | c2860d3266aa1233787400b163d527fdc7dafe61 (diff) | |
p2p: monitor system use core::EventSys instead of pubsub pattern
Diffstat (limited to 'p2p/src/discovery')
| -rw-r--r-- | p2p/src/discovery/lookup.rs | 15 | ||||
| -rw-r--r-- | p2p/src/discovery/refresh.rs | 18 | 
2 files changed, 21 insertions, 12 deletions
| diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index cff4610..613c3cd 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -134,7 +134,7 @@ impl LookupService {      pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {          trace!("Lookup started {endpoint}");          self.monitor -            .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) +            .notify(DiscoveryEvent::LookupStarted(endpoint.clone()))              .await;          let mut random_peers = vec![]; @@ -143,7 +143,7 @@ impl LookupService {              .await          {              self.monitor -                .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) +                .notify(DiscoveryEvent::LookupFailed(endpoint.clone()))                  .await;              return Err(err);          }; @@ -166,7 +166,10 @@ impl LookupService {          drop(table);          self.monitor -            .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) +            .notify(DiscoveryEvent::LookupSucceeded( +                endpoint.clone(), +                peer_buffer.len(), +            ))              .await;          Ok(()) @@ -230,10 +233,14 @@ impl LookupService {          target_peer_id: &PeerID,      ) -> Result<Vec<PeerMsg>> {          let conn = self.connector.connect(&endpoint, &peer_id).await?; +        self.monitor +            .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone()))) +            .await; +          let result = self.handle_outbound(conn, target_peer_id).await;          self.monitor -            .notify(&ConnEvent::Disconnected(endpoint).into()) +            .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))              .await;          self.outbound_slots.remove().await; diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 0c49ac2..430e749 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -119,9 +119,7 @@ impl RefreshService {              sleep(Duration::from_secs(self.config.refresh_interval)).await;              trace!("Start refreshing the routing table..."); -            self.monitor -                .notify(&DiscoveryEvent::RefreshStarted.into()) -                .await; +            self.monitor.notify(DiscoveryEvent::RefreshStarted).await;              let mut entries: Vec<BucketEntry> = vec![];              for bucket in self.table.lock().await.iter() { @@ -213,13 +211,15 @@ impl RefreshService {          let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {              Ok(c) => {                  self.monitor -                    .notify(&ConnEvent::Listening(endpoint.clone()).into()) +                    .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone())))                      .await;                  c              }              Err(err) => {                  self.monitor -                    .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) +                    .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed( +                        endpoint.clone(), +                    )))                      .await;                  return Err(err.into());              } @@ -230,7 +230,9 @@ impl RefreshService {              let res = self.listen_to_ping_msg(&conn).await;              if let Err(err) = res {                  trace!("Failed to handle ping msg {err}"); -                self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; +                self.monitor +                    .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed)) +                    .await;              }          }      } @@ -239,7 +241,7 @@ impl RefreshService {      async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {          let (msg, endpoint) = conn.recv().await?;          self.monitor -            .notify(&ConnEvent::Accepted(endpoint.clone()).into()) +            .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone())))              .await;          match msg { @@ -251,7 +253,7 @@ impl RefreshService {          }          self.monitor -            .notify(&ConnEvent::Disconnected(endpoint).into()) +            .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))              .await;          Ok(())      } | 
