diff options
-rw-r--r-- | p2p/src/connection.rs | 15 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 38 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 2 | ||||
-rw-r--r-- | p2p/src/error.rs | 3 | ||||
-rw-r--r-- | p2p/src/message.rs | 9 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 56 | ||||
-rw-r--r-- | p2p/src/routing_table/bucket.rs | 21 | ||||
-rw-r--r-- | p2p/src/routing_table/mod.rs | 12 |
8 files changed, 111 insertions, 45 deletions
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs index 7c81aa2..8ec2617 100644 --- a/p2p/src/connection.rs +++ b/p2p/src/connection.rs @@ -1,10 +1,12 @@ -use smol::{channel::Sender, lock::Mutex}; use std::{collections::VecDeque, fmt, sync::Arc}; -use karyons_core::async_utils::CondVar; +use smol::{channel::Sender, lock::Mutex}; +use karyons_core::async_utils::CondVar; use karyons_net::Conn; +use crate::Result; + /// Defines the direction of a network connection. #[derive(Clone, Debug)] pub enum ConnDirection { @@ -24,7 +26,7 @@ impl fmt::Display for ConnDirection { pub struct NewConn { pub direction: ConnDirection, pub conn: Conn, - pub disconnect_signal: Sender<()>, + pub disconnect_signal: Sender<Result<()>>, } /// Connection queue @@ -42,7 +44,7 @@ impl ConnQueue { } /// Push a connection into the queue and wait for the disconnect signal - pub async fn handle(&self, conn: Conn, direction: ConnDirection) { + pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> { let (disconnect_signal, chan) = smol::channel::bounded(1); let new_conn = NewConn { direction, @@ -51,7 +53,10 @@ impl ConnQueue { }; self.queue.lock().await.push_back(new_conn); self.conn_available.signal(); - let _ = chan.recv().await; + if let Ok(result) = chan.recv().await { + return result; + } + Ok(()) } /// Receive the next connection in the queue diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7d37eec..7f55309 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -21,8 +21,8 @@ use crate::{ listener::Listener, monitor::Monitor, routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, + INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, slots::ConnectionSlots, Error, PeerID, Result, @@ -169,8 +169,8 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); - let callback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; + let callback = |c: Conn| async move { + selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; Ok(()) }; @@ -205,12 +205,34 @@ impl Discovery { /// Connect to the given endpoint using the connector async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { let selfc = self.clone(); - let pid_cloned = pid.clone(); + let pid_c = pid.clone(); + let endpoint_c = endpoint.clone(); let cback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - if let Some(pid) = pid_cloned { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; + + // If the entry is not in the routing table, ignore the result + let pid = match pid_c { + Some(p) => p, + None => return Ok(()), + }; + + match result { + Err(Error::IncompatiblePeer) => { + error!("Failed to do handshake: {endpoint_c} incompatible peer"); + selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; + } + Err(Error::PeerAlreadyConnected) => { + // TODO + selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + } + Err(_) => { + selfc.update_entry(&pid, UNSTABLE_ENTRY).await; + } + Ok(_) => { + selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + } } + Ok(()) }; diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index b9b7bae..d095f19 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -148,7 +148,7 @@ impl RefreshService { for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { - if bucket_entry.is_connected() { + if bucket_entry.is_connected() || bucket_entry.is_incompatible() { continue; } diff --git a/p2p/src/error.rs b/p2p/src/error.rs index 5a62676..0c1d50c 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -14,6 +14,9 @@ pub enum Error { #[error("Invalid message error: {0}")] InvalidMsg(String), + #[error("Incompatible Peer")] + IncompatiblePeer, + #[error(transparent)] ParseIntError(#[from] std::num::ParseIntError), diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 9e73809..3779cc1 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -62,9 +62,14 @@ pub struct VerMsg { pub protocols: HashMap<ProtocolID, VersionInt>, } -/// VerAck message acknowledging the receipt of a Version message. +/// VerAck message acknowledges the receipt of a Version message. The message +/// consists of the peer ID and an acknowledgment boolean value indicating +/// whether the version is accepted. #[derive(Decode, Encode, Debug, Clone)] -pub struct VerAckMsg(pub PeerID); +pub struct VerAckMsg { + pub peer_id: PeerID, + pub ack: bool, +} /// Shutdown message. #[derive(Decode, Encode, Debug, Clone)] diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index e2a9de7..a0079f2 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -100,18 +100,15 @@ impl PeerPool { pub async fn listen_loop(self: Arc<Self>) { loop { let new_conn = self.conn_queue.next().await; - let disconnect_signal = new_conn.disconnect_signal; + let signal = new_conn.disconnect_signal; let result = self - .new_peer( - new_conn.conn, - &new_conn.direction, - disconnect_signal.clone(), - ) + .new_peer(new_conn.conn, &new_conn.direction, signal.clone()) .await; + // Only send a disconnect signal if there is an error when adding a peer. if result.is_err() { - let _ = disconnect_signal.send(()).await; + let _ = signal.send(result).await; } } } @@ -155,12 +152,12 @@ impl PeerPool { self: &Arc<Self>, conn: Conn, conn_direction: &ConnDirection, - disconnect_signal: Sender<()>, - ) -> Result<PeerID> { + disconnect_signal: Sender<Result<()>>, + ) -> Result<()> { let endpoint = conn.peer_endpoint()?; let io_codec = IOCodec::new(conn); - // Do a handshake with a connection before creating a new peer. + // Do a handshake with the connection before creating a new peer. let pid = self.do_handshake(&io_codec, conn_direction).await?; // TODO: Consider restricting the subnet for inbound connections @@ -184,11 +181,11 @@ impl PeerPool { let selfc = self.clone(); let pid_c = pid.clone(); let on_disconnect = |result| async move { - if let TaskResult::Completed(_) = result { + if let TaskResult::Completed(result) = result { if let Err(err) = selfc.remove_peer(&pid_c).await { error!("Failed to remove peer {pid_c}: {err}"); } - let _ = disconnect_signal.send(()).await; + let _ = disconnect_signal.send(result).await; } }; @@ -200,7 +197,8 @@ impl PeerPool { self.monitor .notify(&PeerPoolEvent::NewPeer(pid.clone()).into()) .await; - Ok(pid) + + Ok(()) } /// Checks if the peer list contains a peer with the given peer id @@ -244,10 +242,19 @@ impl PeerPool { ) -> Result<PeerID> { match conn_direction { ConnDirection::Inbound => { - let pid = self.wait_vermsg(io_codec).await?; - self.send_verack(io_codec).await?; - Ok(pid) + let result = self.wait_vermsg(io_codec).await; + match result { + Ok(_) => { + self.send_verack(io_codec, true).await?; + } + Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => { + self.send_verack(io_codec, false).await?; + } + _ => {} + } + result } + ConnDirection::Outbound => { self.send_vermsg(io_codec).await?; self.wait_verack(io_codec).await @@ -293,10 +300,13 @@ impl PeerPool { } /// Send a Verack message - async fn send_verack(&self, io_codec: &IOCodec) -> Result<()> { - let verack = VerAckMsg(self.id.clone()); + async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> { + let verack = VerAckMsg { + peer_id: self.id.clone(), + ack, + }; - trace!("Send VerAckMsg"); + trace!("Send VerAckMsg {:?}", verack); io_codec.write(NetMsgCmd::Verack, &verack).await?; Ok(()) } @@ -311,8 +321,12 @@ impl PeerPool { let payload = get_msg_payload!(Verack, msg); let (verack, _) = decode::<VerAckMsg>(&payload)?; - trace!("Received VerAckMsg from: {}", verack.0); - Ok(verack.0) + if !verack.ack { + return Err(Error::IncompatiblePeer); + } + + trace!("Received VerAckMsg from: {}", verack.peer_id); + Ok(verack.peer_id) } /// Check if the new connection has compatible protocols. diff --git a/p2p/src/routing_table/bucket.rs b/p2p/src/routing_table/bucket.rs index 13edd24..0f43b13 100644 --- a/p2p/src/routing_table/bucket.rs +++ b/p2p/src/routing_table/bucket.rs @@ -6,23 +6,28 @@ use rand::{rngs::OsRng, seq::SliceRandom}; pub type EntryStatusFlag = u16; /// The entry is connected. -pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001; +pub const CONNECTED_ENTRY: EntryStatusFlag = 0b000001; /// The entry is disconnected. This will increase the failure counter. -pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010; +pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b000010; /// The entry is ready to reconnect, meaning it has either been added and /// has no connection attempts, or it has been refreshed. -pub const PENDING_ENTRY: EntryStatusFlag = 0b00100; +pub const PENDING_ENTRY: EntryStatusFlag = 0b000100; /// The entry is unreachable. This will increase the failure counter. -pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000; +pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b001000; /// The entry is unstable. This will increase the failure counter. -pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000; +pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b010000; + +/// The entry is incompatible. This entry will not contribute to an increase in +/// failure attempts, instead, it will persist in the routing table for the +/// lookup process and will only be removed in the presence of a new entry. +pub const INCOMPATIBLE_ENTRY: EntryStatusFlag = 0b100000; #[allow(dead_code)] -pub const ALL_ENTRY: EntryStatusFlag = 0b11111; +pub const ALL_ENTRY: EntryStatusFlag = 0b111111; /// A BucketEntry represents a peer in the routing table. #[derive(Clone, Debug)] @@ -38,6 +43,10 @@ impl BucketEntry { self.status ^ CONNECTED_ENTRY == 0 } + pub fn is_incompatible(&self) -> bool { + self.status ^ INCOMPATIBLE_ENTRY == 0 + } + pub fn is_unreachable(&self) -> bool { self.status ^ UNREACHABLE_ENTRY == 0 } diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index abf9a08..35729da 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -1,8 +1,8 @@ mod bucket; mod entry; pub use bucket::{ - Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, + PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }; pub use entry::{xor_distance, Entry, Key}; @@ -82,6 +82,14 @@ impl RoutingTable { return AddEntryResult::Added; } + // Replace it with an incompatible entry if one exists. + let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned(); + if let Some(e) = incompatible_entry { + bucket.remove(&e.entry.key); + bucket.add(&entry); + return AddEntryResult::Added; + } + // If the bucket is full, the entry is ignored. AddEntryResult::Ignored } |