aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/src/connection.rs15
-rw-r--r--p2p/src/discovery/mod.rs38
-rw-r--r--p2p/src/discovery/refresh.rs2
-rw-r--r--p2p/src/error.rs3
-rw-r--r--p2p/src/message.rs9
-rw-r--r--p2p/src/peer_pool.rs56
-rw-r--r--p2p/src/routing_table/bucket.rs21
-rw-r--r--p2p/src/routing_table/mod.rs12
8 files changed, 111 insertions, 45 deletions
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index 7c81aa2..8ec2617 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -1,10 +1,12 @@
-use smol::{channel::Sender, lock::Mutex};
use std::{collections::VecDeque, fmt, sync::Arc};
-use karyons_core::async_utils::CondVar;
+use smol::{channel::Sender, lock::Mutex};
+use karyons_core::async_utils::CondVar;
use karyons_net::Conn;
+use crate::Result;
+
/// Defines the direction of a network connection.
#[derive(Clone, Debug)]
pub enum ConnDirection {
@@ -24,7 +26,7 @@ impl fmt::Display for ConnDirection {
pub struct NewConn {
pub direction: ConnDirection,
pub conn: Conn,
- pub disconnect_signal: Sender<()>,
+ pub disconnect_signal: Sender<Result<()>>,
}
/// Connection queue
@@ -42,7 +44,7 @@ impl ConnQueue {
}
/// Push a connection into the queue and wait for the disconnect signal
- pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
+ pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> {
let (disconnect_signal, chan) = smol::channel::bounded(1);
let new_conn = NewConn {
direction,
@@ -51,7 +53,10 @@ impl ConnQueue {
};
self.queue.lock().await.push_back(new_conn);
self.conn_available.signal();
- let _ = chan.recv().await;
+ if let Ok(result) = chan.recv().await {
+ return result;
+ }
+ Ok(())
}
/// Receive the next connection in the queue
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7d37eec..7f55309 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -21,8 +21,8 @@ use crate::{
listener::Listener,
monitor::Monitor,
routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
- UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
+ INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
@@ -169,8 +169,8 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
- let callback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
+ let callback = |c: Conn| async move {
+ selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
Ok(())
};
@@ -205,12 +205,34 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
let selfc = self.clone();
- let pid_cloned = pid.clone();
+ let pid_c = pid.clone();
+ let endpoint_c = endpoint.clone();
let cback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
- if let Some(pid) = pid_cloned {
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid_c {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint_c} incompatible peer");
+ selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ // TODO
+ selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ }
+ Err(_) => {
+ selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
+ }
+ Ok(_) => {
+ selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ }
}
+
Ok(())
};
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index b9b7bae..d095f19 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -148,7 +148,7 @@ impl RefreshService {
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {
- if bucket_entry.is_connected() {
+ if bucket_entry.is_connected() || bucket_entry.is_incompatible() {
continue;
}
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index 5a62676..0c1d50c 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -14,6 +14,9 @@ pub enum Error {
#[error("Invalid message error: {0}")]
InvalidMsg(String),
+ #[error("Incompatible Peer")]
+ IncompatiblePeer,
+
#[error(transparent)]
ParseIntError(#[from] std::num::ParseIntError),
diff --git a/p2p/src/message.rs b/p2p/src/message.rs
index 9e73809..3779cc1 100644
--- a/p2p/src/message.rs
+++ b/p2p/src/message.rs
@@ -62,9 +62,14 @@ pub struct VerMsg {
pub protocols: HashMap<ProtocolID, VersionInt>,
}
-/// VerAck message acknowledging the receipt of a Version message.
+/// VerAck message acknowledges the receipt of a Version message. The message
+/// consists of the peer ID and an acknowledgment boolean value indicating
+/// whether the version is accepted.
#[derive(Decode, Encode, Debug, Clone)]
-pub struct VerAckMsg(pub PeerID);
+pub struct VerAckMsg {
+ pub peer_id: PeerID,
+ pub ack: bool,
+}
/// Shutdown message.
#[derive(Decode, Encode, Debug, Clone)]
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index e2a9de7..a0079f2 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -100,18 +100,15 @@ impl PeerPool {
pub async fn listen_loop(self: Arc<Self>) {
loop {
let new_conn = self.conn_queue.next().await;
- let disconnect_signal = new_conn.disconnect_signal;
+ let signal = new_conn.disconnect_signal;
let result = self
- .new_peer(
- new_conn.conn,
- &new_conn.direction,
- disconnect_signal.clone(),
- )
+ .new_peer(new_conn.conn, &new_conn.direction, signal.clone())
.await;
+ // Only send a disconnect signal if there is an error when adding a peer.
if result.is_err() {
- let _ = disconnect_signal.send(()).await;
+ let _ = signal.send(result).await;
}
}
}
@@ -155,12 +152,12 @@ impl PeerPool {
self: &Arc<Self>,
conn: Conn,
conn_direction: &ConnDirection,
- disconnect_signal: Sender<()>,
- ) -> Result<PeerID> {
+ disconnect_signal: Sender<Result<()>>,
+ ) -> Result<()> {
let endpoint = conn.peer_endpoint()?;
let io_codec = IOCodec::new(conn);
- // Do a handshake with a connection before creating a new peer.
+ // Do a handshake with the connection before creating a new peer.
let pid = self.do_handshake(&io_codec, conn_direction).await?;
// TODO: Consider restricting the subnet for inbound connections
@@ -184,11 +181,11 @@ impl PeerPool {
let selfc = self.clone();
let pid_c = pid.clone();
let on_disconnect = |result| async move {
- if let TaskResult::Completed(_) = result {
+ if let TaskResult::Completed(result) = result {
if let Err(err) = selfc.remove_peer(&pid_c).await {
error!("Failed to remove peer {pid_c}: {err}");
}
- let _ = disconnect_signal.send(()).await;
+ let _ = disconnect_signal.send(result).await;
}
};
@@ -200,7 +197,8 @@ impl PeerPool {
self.monitor
.notify(&PeerPoolEvent::NewPeer(pid.clone()).into())
.await;
- Ok(pid)
+
+ Ok(())
}
/// Checks if the peer list contains a peer with the given peer id
@@ -244,10 +242,19 @@ impl PeerPool {
) -> Result<PeerID> {
match conn_direction {
ConnDirection::Inbound => {
- let pid = self.wait_vermsg(io_codec).await?;
- self.send_verack(io_codec).await?;
- Ok(pid)
+ let result = self.wait_vermsg(io_codec).await;
+ match result {
+ Ok(_) => {
+ self.send_verack(io_codec, true).await?;
+ }
+ Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
+ self.send_verack(io_codec, false).await?;
+ }
+ _ => {}
+ }
+ result
}
+
ConnDirection::Outbound => {
self.send_vermsg(io_codec).await?;
self.wait_verack(io_codec).await
@@ -293,10 +300,13 @@ impl PeerPool {
}
/// Send a Verack message
- async fn send_verack(&self, io_codec: &IOCodec) -> Result<()> {
- let verack = VerAckMsg(self.id.clone());
+ async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> {
+ let verack = VerAckMsg {
+ peer_id: self.id.clone(),
+ ack,
+ };
- trace!("Send VerAckMsg");
+ trace!("Send VerAckMsg {:?}", verack);
io_codec.write(NetMsgCmd::Verack, &verack).await?;
Ok(())
}
@@ -311,8 +321,12 @@ impl PeerPool {
let payload = get_msg_payload!(Verack, msg);
let (verack, _) = decode::<VerAckMsg>(&payload)?;
- trace!("Received VerAckMsg from: {}", verack.0);
- Ok(verack.0)
+ if !verack.ack {
+ return Err(Error::IncompatiblePeer);
+ }
+
+ trace!("Received VerAckMsg from: {}", verack.peer_id);
+ Ok(verack.peer_id)
}
/// Check if the new connection has compatible protocols.
diff --git a/p2p/src/routing_table/bucket.rs b/p2p/src/routing_table/bucket.rs
index 13edd24..0f43b13 100644
--- a/p2p/src/routing_table/bucket.rs
+++ b/p2p/src/routing_table/bucket.rs
@@ -6,23 +6,28 @@ use rand::{rngs::OsRng, seq::SliceRandom};
pub type EntryStatusFlag = u16;
/// The entry is connected.
-pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001;
+pub const CONNECTED_ENTRY: EntryStatusFlag = 0b000001;
/// The entry is disconnected. This will increase the failure counter.
-pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010;
+pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b000010;
/// The entry is ready to reconnect, meaning it has either been added and
/// has no connection attempts, or it has been refreshed.
-pub const PENDING_ENTRY: EntryStatusFlag = 0b00100;
+pub const PENDING_ENTRY: EntryStatusFlag = 0b000100;
/// The entry is unreachable. This will increase the failure counter.
-pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000;
+pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b001000;
/// The entry is unstable. This will increase the failure counter.
-pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000;
+pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b010000;
+
+/// The entry is incompatible. This entry will not contribute to an increase in
+/// failure attempts, instead, it will persist in the routing table for the
+/// lookup process and will only be removed in the presence of a new entry.
+pub const INCOMPATIBLE_ENTRY: EntryStatusFlag = 0b100000;
#[allow(dead_code)]
-pub const ALL_ENTRY: EntryStatusFlag = 0b11111;
+pub const ALL_ENTRY: EntryStatusFlag = 0b111111;
/// A BucketEntry represents a peer in the routing table.
#[derive(Clone, Debug)]
@@ -38,6 +43,10 @@ impl BucketEntry {
self.status ^ CONNECTED_ENTRY == 0
}
+ pub fn is_incompatible(&self) -> bool {
+ self.status ^ INCOMPATIBLE_ENTRY == 0
+ }
+
pub fn is_unreachable(&self) -> bool {
self.status ^ UNREACHABLE_ENTRY == 0
}
diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs
index abf9a08..35729da 100644
--- a/p2p/src/routing_table/mod.rs
+++ b/p2p/src/routing_table/mod.rs
@@ -1,8 +1,8 @@
mod bucket;
mod entry;
pub use bucket::{
- Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
- UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
+ PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
};
pub use entry::{xor_distance, Entry, Key};
@@ -82,6 +82,14 @@ impl RoutingTable {
return AddEntryResult::Added;
}
+ // Replace it with an incompatible entry if one exists.
+ let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
+ if let Some(e) = incompatible_entry {
+ bucket.remove(&e.entry.key);
+ bucket.add(&entry);
+ return AddEntryResult::Added;
+ }
+
// If the bucket is full, the entry is ignored.
AddEntryResult::Ignored
}