aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/mod.rs38
-rw-r--r--p2p/src/discovery/refresh.rs2
2 files changed, 31 insertions, 9 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7d37eec..7f55309 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -21,8 +21,8 @@ use crate::{
listener::Listener,
monitor::Monitor,
routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
- UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
+ INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
@@ -169,8 +169,8 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
- let callback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
+ let callback = |c: Conn| async move {
+ selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
Ok(())
};
@@ -205,12 +205,34 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
let selfc = self.clone();
- let pid_cloned = pid.clone();
+ let pid_c = pid.clone();
+ let endpoint_c = endpoint.clone();
let cback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
- if let Some(pid) = pid_cloned {
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid_c {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint_c} incompatible peer");
+ selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ // TODO
+ selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ }
+ Err(_) => {
+ selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
+ }
+ Ok(_) => {
+ selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ }
}
+
Ok(())
};
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index b9b7bae..d095f19 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -148,7 +148,7 @@ impl RefreshService {
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {
- if bucket_entry.is_connected() {
+ if bucket_entry.is_connected() || bucket_entry.is_incompatible() {
continue;
}