aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer_pool.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
commite15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch)
tree7976f6993e4f6b3646f5bd6954189346d5ffd330 /p2p/src/peer_pool.rs
parent6c65232d741229635151671708556b9af7ef75ac (diff)
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'p2p/src/peer_pool.rs')
-rw-r--r--p2p/src/peer_pool.rs296
1 files changed, 80 insertions, 216 deletions
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 1f3ca55..549dc76 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -1,26 +1,24 @@
-use std::{collections::HashMap, sync::Arc, time::Duration};
+use std::{collections::HashMap, sync::Arc};
-use async_channel::Sender;
-use bincode::{Decode, Encode};
-use log::{error, info, trace, warn};
+use bincode::Encode;
+use log::{error, info, warn};
use karyon_core::{
async_runtime::{lock::RwLock, Executor},
- async_util::{timeout, TaskGroup, TaskResult},
- util::decode,
+ async_util::{TaskGroup, TaskResult},
};
-use karyon_net::{Conn, Endpoint};
+use karyon_net::Endpoint;
use crate::{
config::Config,
- conn_queue::{ConnDirection, ConnQueue},
- message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
+ conn_queue::ConnQueue,
+ connection::Connection,
monitor::{Monitor, PPEvent},
peer::Peer,
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
- version::{version_match, Version, VersionInt},
+ version::Version,
Error, PeerID, Result,
};
@@ -37,8 +35,8 @@ pub struct PeerPool {
/// Hashmap contains protocol constructors.
pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>,
- /// Hashmap contains protocol IDs and their versions.
- protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>,
+ /// Hashmap contains protocols with their versions
+ pub(crate) protocol_versions: RwLock<HashMap<ProtocolID, Version>>,
/// Managing spawned tasks.
task_group: TaskGroup,
@@ -47,7 +45,7 @@ pub struct PeerPool {
executor: Executor,
/// The Configuration for the P2P network.
- pub(crate) config: Arc<Config>,
+ config: Arc<Config>,
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
@@ -62,15 +60,12 @@ impl PeerPool {
monitor: Arc<Monitor>,
executor: Executor,
) -> Arc<Self> {
- let protocols = RwLock::new(HashMap::new());
- let protocol_versions = Arc::new(RwLock::new(HashMap::new()));
-
Arc::new(Self {
id: id.clone(),
conn_queue,
peers: RwLock::new(HashMap::new()),
- protocols,
- protocol_versions,
+ protocols: RwLock::new(HashMap::new()),
+ protocol_versions: RwLock::new(HashMap::new()),
task_group: TaskGroup::with_executor(executor.clone()),
executor,
monitor,
@@ -80,21 +75,15 @@ impl PeerPool {
/// Starts the [`PeerPool`]
pub async fn start(self: &Arc<Self>) -> Result<()> {
- self.setup_protocols().await?;
- self.task_group.spawn(
- {
- let this = self.clone();
- async move { this.listen_loop().await }
- },
- |_| async {},
- );
+ self.setup_core_protocols().await?;
+ self.task_group.spawn(self.clone().run(), |_| async {});
Ok(())
}
/// Shuts down
pub async fn shutdown(&self) {
for (_, peer) in self.peers.read().await.iter() {
- peer.shutdown().await;
+ let _ = peer.shutdown().await;
}
self.task_group.cancel().await;
@@ -102,76 +91,24 @@ impl PeerPool {
/// Attach a custom protocol to the network
pub async fn attach_protocol<P: Protocol>(&self, c: Box<ProtocolConstructor>) -> Result<()> {
- let protocol_versions = &mut self.protocol_versions.write().await;
- let protocols = &mut self.protocols.write().await;
-
- protocol_versions.insert(P::id(), P::version()?);
- protocols.insert(P::id(), c);
+ self.protocols.write().await.insert(P::id(), c);
+ self.protocol_versions
+ .write()
+ .await
+ .insert(P::id(), P::version()?);
Ok(())
}
/// Broadcast a message to all connected peers using the specified protocol.
- pub async fn broadcast<T: Decode + Encode>(&self, proto_id: &ProtocolID, msg: &T) {
+ pub async fn broadcast<T: Encode>(&self, proto_id: &ProtocolID, msg: &T) {
for (pid, peer) in self.peers.read().await.iter() {
- if let Err(err) = peer.send(proto_id, msg).await {
+ if let Err(err) = peer.conn.send(proto_id.to_string(), msg).await {
error!("failed to send msg to {pid}: {err}");
continue;
}
}
}
- /// Add a new peer to the peer list.
- pub async fn new_peer(
- self: &Arc<Self>,
- conn: Conn<NetMsg>,
- conn_direction: &ConnDirection,
- disconnect_signal: Sender<Result<()>>,
- ) -> Result<()> {
- let endpoint = conn.peer_endpoint()?;
-
- // Do a handshake with the connection before creating a new peer.
- let pid = self.do_handshake(&conn, conn_direction).await?;
-
- // TODO: Consider restricting the subnet for inbound connections
- if self.contains_peer(&pid).await {
- return Err(Error::PeerAlreadyConnected);
- }
-
- // Create a new peer
- let peer = Peer::new(
- Arc::downgrade(self),
- &pid,
- conn,
- endpoint.clone(),
- conn_direction.clone(),
- self.executor.clone(),
- );
-
- // Insert the new peer
- self.peers.write().await.insert(pid.clone(), peer.clone());
-
- let on_disconnect = {
- let this = self.clone();
- let pid = pid.clone();
- |result| async move {
- if let TaskResult::Completed(result) = result {
- if let Err(err) = this.remove_peer(&pid).await {
- error!("Failed to remove peer {pid}: {err}");
- }
- let _ = disconnect_signal.send(result).await;
- }
- }
- };
-
- self.task_group.spawn(peer.run(), on_disconnect);
-
- info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");
-
- self.monitor.notify(PPEvent::NewPeer(pid.clone())).await;
-
- Ok(())
- }
-
/// Checks if the peer list contains a peer with the given peer id
pub async fn contains_peer(&self, pid: &PeerID) -> bool {
self.peers.read().await.contains_key(pid)
@@ -204,162 +141,89 @@ impl PeerPool {
peers
}
- /// Listens to a new connection from the connection queue
- async fn listen_loop(self: Arc<Self>) {
+ async fn run(self: Arc<Self>) {
loop {
- let conn = self.conn_queue.next().await;
- let signal = conn.disconnect_signal;
+ let mut conn = self.conn_queue.next().await;
+
+ for protocol_id in self.protocols.read().await.keys() {
+ conn.register_protocol(protocol_id.to_string()).await;
+ }
- let result = self
- .new_peer(conn.conn, &conn.direction, signal.clone())
- .await;
+ let conn = Arc::new(conn);
- // Only send a disconnect signal if there is an error when adding a peer.
+ let result = self.new_peer(conn.clone()).await;
+
+ // Disconnect if there is an error when adding a peer.
if result.is_err() {
- let _ = signal.send(result).await;
+ let _ = conn.disconnect(result).await;
}
}
}
- /// Shuts down the peer and remove it from the peer list.
- async fn remove_peer(&self, pid: &PeerID) -> Result<()> {
- let result = self.peers.write().await.remove(pid);
-
- let peer = match result {
- Some(p) => p,
- None => return Ok(()),
- };
-
- peer.shutdown().await;
-
- self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await;
-
- let endpoint = peer.remote_endpoint();
- let direction = peer.direction();
+ /// Add a new peer to the peer list.
+ async fn new_peer(self: &Arc<Self>, conn: Arc<Connection>) -> Result<()> {
+ // Create a new peer
+ let peer = Peer::new(
+ self.id.clone(),
+ Arc::downgrade(self),
+ conn.clone(),
+ self.config.clone(),
+ self.executor.clone(),
+ );
+ peer.init().await?;
+ let pid = peer.id().expect("Get peer id after peer initialization");
- warn!("Peer {pid} removed, direction: {direction}, endpoint: {endpoint}",);
- Ok(())
- }
+ // TODO: Consider restricting the subnet for inbound connections
+ if self.contains_peer(&pid).await {
+ return Err(Error::PeerAlreadyConnected);
+ }
- /// Attach the core protocols.
- async fn setup_protocols(&self) -> Result<()> {
- let executor = self.executor.clone();
- let c = move |peer| PingProtocol::new(peer, executor.clone());
- self.attach_protocol::<PingProtocol>(Box::new(c)).await
- }
+ // Insert the new peer
+ self.peers.write().await.insert(pid.clone(), peer.clone());
- /// Initiate a handshake with a connection.
- async fn do_handshake(
- &self,
- conn: &Conn<NetMsg>,
- conn_direction: &ConnDirection,
- ) -> Result<PeerID> {
- trace!("Handshake started: {}", conn.peer_endpoint()?);
- match conn_direction {
- ConnDirection::Inbound => {
- let result = self.wait_vermsg(conn).await;
- match result {
- Ok(_) => {
- self.send_verack(conn, true).await?;
- }
- Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
- self.send_verack(conn, false).await?;
+ let on_disconnect = {
+ let this = self.clone();
+ let pid = pid.clone();
+ |result| async move {
+ if let TaskResult::Completed(_) = result {
+ if let Err(err) = this.remove_peer(&pid).await {
+ error!("Failed to remove peer {pid}: {err}");
}
- _ => {}
}
- result
- }
-
- ConnDirection::Outbound => {
- self.send_vermsg(conn).await?;
- self.wait_verack(conn).await
}
- }
- }
+ };
- /// Send a Version message
- async fn send_vermsg(&self, conn: &Conn<NetMsg>) -> Result<()> {
- let pids = self.protocol_versions.read().await;
- let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect();
- drop(pids);
+ self.task_group.spawn(peer.run(), on_disconnect);
- let vermsg = VerMsg {
- peer_id: self.id.clone(),
- protocols,
- version: self.config.version.v.clone(),
- };
+ info!("Add new peer {pid}");
+ self.monitor.notify(PPEvent::NewPeer(pid)).await;
- trace!("Send VerMsg");
- conn.send(NetMsg::new(NetMsgCmd::Version, &vermsg)?).await?;
Ok(())
}
- /// Wait for a Version message
- ///
- /// Returns the peer's ID upon successfully receiving the Version message.
- async fn wait_vermsg(&self, conn: &Conn<NetMsg>) -> Result<PeerID> {
- let t = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = timeout(t, conn.recv()).await??;
-
- let payload = get_msg_payload!(Version, msg);
- let (vermsg, _) = decode::<VerMsg>(&payload)?;
-
- if !version_match(&self.config.version.req, &vermsg.version) {
- return Err(Error::IncompatibleVersion("system: {}".into()));
- }
-
- self.protocols_match(&vermsg.protocols).await?;
-
- trace!("Received VerMsg from: {}", vermsg.peer_id);
- Ok(vermsg.peer_id)
- }
+ /// Shuts down the peer and remove it from the peer list.
+ async fn remove_peer(&self, pid: &PeerID) -> Result<()> {
+ let result = self.peers.write().await.remove(pid);
- /// Send a Verack message
- async fn send_verack(&self, conn: &Conn<NetMsg>, ack: bool) -> Result<()> {
- let verack = VerAckMsg {
- peer_id: self.id.clone(),
- ack,
+ let peer = match result {
+ Some(p) => p,
+ None => return Ok(()),
};
- trace!("Send VerAckMsg {:?}", verack);
- conn.send(NetMsg::new(NetMsgCmd::Verack, &verack)?).await?;
- Ok(())
- }
-
- /// Wait for a Verack message
- ///
- /// Returns the peer's ID upon successfully receiving the Verack message.
- async fn wait_verack(&self, conn: &Conn<NetMsg>) -> Result<PeerID> {
- let t = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = timeout(t, conn.recv()).await??;
+ let _ = peer.shutdown().await;
- let payload = get_msg_payload!(Verack, msg);
- let (verack, _) = decode::<VerAckMsg>(&payload)?;
-
- if !verack.ack {
- return Err(Error::IncompatiblePeer);
- }
+ self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await;
- trace!("Received VerAckMsg from: {}", verack.peer_id);
- Ok(verack.peer_id)
+ warn!("Peer {pid} removed",);
+ Ok(())
}
- /// Check if the new connection has compatible protocols.
- async fn protocols_match(&self, protocols: &HashMap<ProtocolID, VersionInt>) -> Result<()> {
- for (n, pv) in protocols.iter() {
- let pids = self.protocol_versions.read().await;
-
- match pids.get(n) {
- Some(v) => {
- if !version_match(&v.req, pv) {
- return Err(Error::IncompatibleVersion(format!("{n} protocol: {pv}")));
- }
- }
- None => {
- return Err(Error::UnsupportedProtocol(n.to_string()));
- }
- }
- }
- Ok(())
+ /// Attach the core protocols.
+ async fn setup_core_protocols(&self) -> Result<()> {
+ let executor = self.executor.clone();
+ let ping_interval = self.config.ping_interval;
+ let ping_timeout = self.config.ping_timeout;
+ let c = move |peer| PingProtocol::new(peer, ping_interval, ping_timeout, executor.clone());
+ self.attach_protocol::<PingProtocol>(Box::new(c)).await
}
}