diff options
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r-- | p2p/src/discovery/mod.rs | 38 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 2 |
2 files changed, 31 insertions, 9 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7d37eec..7f55309 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -21,8 +21,8 @@ use crate::{ listener::Listener, monitor::Monitor, routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, + INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, slots::ConnectionSlots, Error, PeerID, Result, @@ -169,8 +169,8 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); - let callback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; + let callback = |c: Conn| async move { + selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; Ok(()) }; @@ -205,12 +205,34 @@ impl Discovery { /// Connect to the given endpoint using the connector async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { let selfc = self.clone(); - let pid_cloned = pid.clone(); + let pid_c = pid.clone(); + let endpoint_c = endpoint.clone(); let cback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - if let Some(pid) = pid_cloned { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; + + // If the entry is not in the routing table, ignore the result + let pid = match pid_c { + Some(p) => p, + None => return Ok(()), + }; + + match result { + Err(Error::IncompatiblePeer) => { + error!("Failed to do handshake: {endpoint_c} incompatible peer"); + selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; + } + Err(Error::PeerAlreadyConnected) => { + // TODO + selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + } + Err(_) => { + selfc.update_entry(&pid, UNSTABLE_ENTRY).await; + } + Ok(_) => { + selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + } } + Ok(()) }; diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index b9b7bae..d095f19 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -148,7 +148,7 @@ impl RefreshService { for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { - if bucket_entry.is_connected() { + if bucket_entry.is_connected() || bucket_entry.is_incompatible() { continue; } |