From 34b0a91dbb107962dae4f593a36d30a29ea87c45 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@proton.me>
Date: Wed, 22 Nov 2023 12:42:00 +0300
Subject: p2p: Improve error handling during handshake:

Introduce a new entry status, INCOMPATIBLE_ENTRY. Entries with this
status will not increase the failure attempts, instead, they will persist in
the routing table until replaced by a new peer. This feature is useful for
seeding and the lookup process.

Add a boolean value to the VerAck message to indicate whether the
version is accepted or not.
---
 p2p/src/connection.rs           | 15 +++++++----
 p2p/src/discovery/mod.rs        | 38 ++++++++++++++++++++++------
 p2p/src/discovery/refresh.rs    |  2 +-
 p2p/src/error.rs                |  3 +++
 p2p/src/message.rs              |  9 +++++--
 p2p/src/peer_pool.rs            | 56 +++++++++++++++++++++++++----------------
 p2p/src/routing_table/bucket.rs | 21 +++++++++++-----
 p2p/src/routing_table/mod.rs    | 12 +++++++--
 8 files changed, 111 insertions(+), 45 deletions(-)

(limited to 'p2p')

diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index 7c81aa2..8ec2617 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -1,10 +1,12 @@
-use smol::{channel::Sender, lock::Mutex};
 use std::{collections::VecDeque, fmt, sync::Arc};
 
-use karyons_core::async_utils::CondVar;
+use smol::{channel::Sender, lock::Mutex};
 
+use karyons_core::async_utils::CondVar;
 use karyons_net::Conn;
 
+use crate::Result;
+
 /// Defines the direction of a network connection.
 #[derive(Clone, Debug)]
 pub enum ConnDirection {
@@ -24,7 +26,7 @@ impl fmt::Display for ConnDirection {
 pub struct NewConn {
     pub direction: ConnDirection,
     pub conn: Conn,
-    pub disconnect_signal: Sender<()>,
+    pub disconnect_signal: Sender<Result<()>>,
 }
 
 /// Connection queue
@@ -42,7 +44,7 @@ impl ConnQueue {
     }
 
     /// Push a connection into the queue and wait for the disconnect signal
-    pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
+    pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> {
         let (disconnect_signal, chan) = smol::channel::bounded(1);
         let new_conn = NewConn {
             direction,
@@ -51,7 +53,10 @@ impl ConnQueue {
         };
         self.queue.lock().await.push_back(new_conn);
         self.conn_available.signal();
-        let _ = chan.recv().await;
+        if let Ok(result) = chan.recv().await {
+            return result;
+        }
+        Ok(())
     }
 
     /// Receive the next connection in the queue
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7d37eec..7f55309 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -21,8 +21,8 @@ use crate::{
     listener::Listener,
     monitor::Monitor,
     routing_table::{
-        Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
-        UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+        Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
+        INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
     },
     slots::ConnectionSlots,
     Error, PeerID, Result,
@@ -169,8 +169,8 @@ impl Discovery {
     /// Start a listener and on success, return the resolved endpoint.
     async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
         let selfc = self.clone();
-        let callback = |conn: Conn| async move {
-            selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
+        let callback = |c: Conn| async move {
+            selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
             Ok(())
         };
 
@@ -205,12 +205,34 @@ impl Discovery {
     /// Connect to the given endpoint using the connector
     async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
         let selfc = self.clone();
-        let pid_cloned = pid.clone();
+        let pid_c = pid.clone();
+        let endpoint_c = endpoint.clone();
         let cback = |conn: Conn| async move {
-            selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
-            if let Some(pid) = pid_cloned {
-                selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+            let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+            // If the entry is not in the routing table, ignore the result
+            let pid = match pid_c {
+                Some(p) => p,
+                None => return Ok(()),
+            };
+
+            match result {
+                Err(Error::IncompatiblePeer) => {
+                    error!("Failed to do handshake: {endpoint_c} incompatible peer");
+                    selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
+                }
+                Err(Error::PeerAlreadyConnected) => {
+                    // TODO
+                    selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+                }
+                Err(_) => {
+                    selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
+                }
+                Ok(_) => {
+                    selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+                }
             }
+
             Ok(())
         };
 
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index b9b7bae..d095f19 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -148,7 +148,7 @@ impl RefreshService {
         for chunk in entries.chunks(16) {
             let mut tasks = Vec::new();
             for bucket_entry in chunk {
-                if bucket_entry.is_connected() {
+                if bucket_entry.is_connected() || bucket_entry.is_incompatible() {
                     continue;
                 }
 
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index 5a62676..0c1d50c 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -14,6 +14,9 @@ pub enum Error {
     #[error("Invalid message error: {0}")]
     InvalidMsg(String),
 
+    #[error("Incompatible Peer")]
+    IncompatiblePeer,
+
     #[error(transparent)]
     ParseIntError(#[from] std::num::ParseIntError),
 
diff --git a/p2p/src/message.rs b/p2p/src/message.rs
index 9e73809..3779cc1 100644
--- a/p2p/src/message.rs
+++ b/p2p/src/message.rs
@@ -62,9 +62,14 @@ pub struct VerMsg {
     pub protocols: HashMap<ProtocolID, VersionInt>,
 }
 
-/// VerAck message acknowledging the receipt of a Version message.
+/// VerAck message acknowledges the receipt of a Version message. The message
+/// consists of the peer ID and an acknowledgment boolean value indicating
+/// whether the version is accepted.
 #[derive(Decode, Encode, Debug, Clone)]
-pub struct VerAckMsg(pub PeerID);
+pub struct VerAckMsg {
+    pub peer_id: PeerID,
+    pub ack: bool,
+}
 
 /// Shutdown message.
 #[derive(Decode, Encode, Debug, Clone)]
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index e2a9de7..a0079f2 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -100,18 +100,15 @@ impl PeerPool {
     pub async fn listen_loop(self: Arc<Self>) {
         loop {
             let new_conn = self.conn_queue.next().await;
-            let disconnect_signal = new_conn.disconnect_signal;
+            let signal = new_conn.disconnect_signal;
 
             let result = self
-                .new_peer(
-                    new_conn.conn,
-                    &new_conn.direction,
-                    disconnect_signal.clone(),
-                )
+                .new_peer(new_conn.conn, &new_conn.direction, signal.clone())
                 .await;
 
+            // Only send a disconnect signal if there is an error when adding a peer.
             if result.is_err() {
-                let _ = disconnect_signal.send(()).await;
+                let _ = signal.send(result).await;
             }
         }
     }
@@ -155,12 +152,12 @@ impl PeerPool {
         self: &Arc<Self>,
         conn: Conn,
         conn_direction: &ConnDirection,
-        disconnect_signal: Sender<()>,
-    ) -> Result<PeerID> {
+        disconnect_signal: Sender<Result<()>>,
+    ) -> Result<()> {
         let endpoint = conn.peer_endpoint()?;
         let io_codec = IOCodec::new(conn);
 
-        // Do a handshake with a connection before creating a new peer.
+        // Do a handshake with the connection before creating a new peer.
         let pid = self.do_handshake(&io_codec, conn_direction).await?;
 
         // TODO: Consider restricting the subnet for inbound connections
@@ -184,11 +181,11 @@ impl PeerPool {
         let selfc = self.clone();
         let pid_c = pid.clone();
         let on_disconnect = |result| async move {
-            if let TaskResult::Completed(_) = result {
+            if let TaskResult::Completed(result) = result {
                 if let Err(err) = selfc.remove_peer(&pid_c).await {
                     error!("Failed to remove peer {pid_c}: {err}");
                 }
-                let _ = disconnect_signal.send(()).await;
+                let _ = disconnect_signal.send(result).await;
             }
         };
 
@@ -200,7 +197,8 @@ impl PeerPool {
         self.monitor
             .notify(&PeerPoolEvent::NewPeer(pid.clone()).into())
             .await;
-        Ok(pid)
+
+        Ok(())
     }
 
     /// Checks if the peer list contains a peer with the given peer id
@@ -244,10 +242,19 @@ impl PeerPool {
     ) -> Result<PeerID> {
         match conn_direction {
             ConnDirection::Inbound => {
-                let pid = self.wait_vermsg(io_codec).await?;
-                self.send_verack(io_codec).await?;
-                Ok(pid)
+                let result = self.wait_vermsg(io_codec).await;
+                match result {
+                    Ok(_) => {
+                        self.send_verack(io_codec, true).await?;
+                    }
+                    Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
+                        self.send_verack(io_codec, false).await?;
+                    }
+                    _ => {}
+                }
+                result
             }
+
             ConnDirection::Outbound => {
                 self.send_vermsg(io_codec).await?;
                 self.wait_verack(io_codec).await
@@ -293,10 +300,13 @@ impl PeerPool {
     }
 
     /// Send a Verack message
-    async fn send_verack(&self, io_codec: &IOCodec) -> Result<()> {
-        let verack = VerAckMsg(self.id.clone());
+    async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> {
+        let verack = VerAckMsg {
+            peer_id: self.id.clone(),
+            ack,
+        };
 
-        trace!("Send VerAckMsg");
+        trace!("Send VerAckMsg {:?}", verack);
         io_codec.write(NetMsgCmd::Verack, &verack).await?;
         Ok(())
     }
@@ -311,8 +321,12 @@ impl PeerPool {
         let payload = get_msg_payload!(Verack, msg);
         let (verack, _) = decode::<VerAckMsg>(&payload)?;
 
-        trace!("Received VerAckMsg from: {}", verack.0);
-        Ok(verack.0)
+        if !verack.ack {
+            return Err(Error::IncompatiblePeer);
+        }
+
+        trace!("Received VerAckMsg from: {}", verack.peer_id);
+        Ok(verack.peer_id)
     }
 
     /// Check if the new connection has compatible protocols.
diff --git a/p2p/src/routing_table/bucket.rs b/p2p/src/routing_table/bucket.rs
index 13edd24..0f43b13 100644
--- a/p2p/src/routing_table/bucket.rs
+++ b/p2p/src/routing_table/bucket.rs
@@ -6,23 +6,28 @@ use rand::{rngs::OsRng, seq::SliceRandom};
 pub type EntryStatusFlag = u16;
 
 /// The entry is connected.
-pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001;
+pub const CONNECTED_ENTRY: EntryStatusFlag = 0b000001;
 
 /// The entry is disconnected. This will increase the failure counter.
-pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010;
+pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b000010;
 
 /// The entry is ready to reconnect, meaning it has either been added and
 /// has no connection attempts, or it has been refreshed.
-pub const PENDING_ENTRY: EntryStatusFlag = 0b00100;
+pub const PENDING_ENTRY: EntryStatusFlag = 0b000100;
 
 /// The entry is unreachable. This will increase the failure counter.
-pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000;
+pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b001000;
 
 /// The entry is unstable. This will increase the failure counter.
-pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000;
+pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b010000;
+
+/// The entry is incompatible. This entry will not contribute to an increase in
+/// failure attempts, instead, it will persist in the routing table for the
+/// lookup process and will only be removed in the presence of a new entry.
+pub const INCOMPATIBLE_ENTRY: EntryStatusFlag = 0b100000;
 
 #[allow(dead_code)]
-pub const ALL_ENTRY: EntryStatusFlag = 0b11111;
+pub const ALL_ENTRY: EntryStatusFlag = 0b111111;
 
 /// A BucketEntry represents a peer in the routing table.
 #[derive(Clone, Debug)]
@@ -38,6 +43,10 @@ impl BucketEntry {
         self.status ^ CONNECTED_ENTRY == 0
     }
 
+    pub fn is_incompatible(&self) -> bool {
+        self.status ^ INCOMPATIBLE_ENTRY == 0
+    }
+
     pub fn is_unreachable(&self) -> bool {
         self.status ^ UNREACHABLE_ENTRY == 0
     }
diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs
index abf9a08..35729da 100644
--- a/p2p/src/routing_table/mod.rs
+++ b/p2p/src/routing_table/mod.rs
@@ -1,8 +1,8 @@
 mod bucket;
 mod entry;
 pub use bucket::{
-    Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
-    UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+    Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
+    PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
 };
 pub use entry::{xor_distance, Entry, Key};
 
@@ -82,6 +82,14 @@ impl RoutingTable {
             return AddEntryResult::Added;
         }
 
+        // Replace it with an incompatible entry if one exists.
+        let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
+        if let Some(e) = incompatible_entry {
+            bucket.remove(&e.entry.key);
+            bucket.add(&entry);
+            return AddEntryResult::Added;
+        }
+
         // If the bucket is full, the entry is ignored.
         AddEntryResult::Ignored
     }
-- 
cgit v1.2.3