From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- p2p/src/peer_pool.rs | 296 ++++++++++++++------------------------------------- 1 file changed, 80 insertions(+), 216 deletions(-) (limited to 'p2p/src/peer_pool.rs') diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 1f3ca55..549dc76 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -1,26 +1,24 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; -use async_channel::Sender; -use bincode::{Decode, Encode}; -use log::{error, info, trace, warn}; +use bincode::Encode; +use log::{error, info, warn}; use karyon_core::{ async_runtime::{lock::RwLock, Executor}, - async_util::{timeout, TaskGroup, TaskResult}, - util::decode, + async_util::{TaskGroup, TaskResult}, }; -use karyon_net::{Conn, Endpoint}; +use karyon_net::Endpoint; use crate::{ config::Config, - conn_queue::{ConnDirection, ConnQueue}, - message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, + conn_queue::ConnQueue, + connection::Connection, monitor::{Monitor, PPEvent}, peer::Peer, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, - version::{version_match, Version, VersionInt}, + version::Version, Error, PeerID, Result, }; @@ -37,8 +35,8 @@ pub struct PeerPool { /// Hashmap contains protocol constructors. pub(crate) protocols: RwLock>>, - /// Hashmap contains protocol IDs and their versions. - protocol_versions: Arc>>, + /// Hashmap contains protocols with their versions + pub(crate) protocol_versions: RwLock>, /// Managing spawned tasks. task_group: TaskGroup, @@ -47,7 +45,7 @@ pub struct PeerPool { executor: Executor, /// The Configuration for the P2P network. - pub(crate) config: Arc, + config: Arc, /// Responsible for network and system monitoring. monitor: Arc, @@ -62,15 +60,12 @@ impl PeerPool { monitor: Arc, executor: Executor, ) -> Arc { - let protocols = RwLock::new(HashMap::new()); - let protocol_versions = Arc::new(RwLock::new(HashMap::new())); - Arc::new(Self { id: id.clone(), conn_queue, peers: RwLock::new(HashMap::new()), - protocols, - protocol_versions, + protocols: RwLock::new(HashMap::new()), + protocol_versions: RwLock::new(HashMap::new()), task_group: TaskGroup::with_executor(executor.clone()), executor, monitor, @@ -80,21 +75,15 @@ impl PeerPool { /// Starts the [`PeerPool`] pub async fn start(self: &Arc) -> Result<()> { - self.setup_protocols().await?; - self.task_group.spawn( - { - let this = self.clone(); - async move { this.listen_loop().await } - }, - |_| async {}, - ); + self.setup_core_protocols().await?; + self.task_group.spawn(self.clone().run(), |_| async {}); Ok(()) } /// Shuts down pub async fn shutdown(&self) { for (_, peer) in self.peers.read().await.iter() { - peer.shutdown().await; + let _ = peer.shutdown().await; } self.task_group.cancel().await; @@ -102,76 +91,24 @@ impl PeerPool { /// Attach a custom protocol to the network pub async fn attach_protocol(&self, c: Box) -> Result<()> { - let protocol_versions = &mut self.protocol_versions.write().await; - let protocols = &mut self.protocols.write().await; - - protocol_versions.insert(P::id(), P::version()?); - protocols.insert(P::id(), c); + self.protocols.write().await.insert(P::id(), c); + self.protocol_versions + .write() + .await + .insert(P::id(), P::version()?); Ok(()) } /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { + pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { for (pid, peer) in self.peers.read().await.iter() { - if let Err(err) = peer.send(proto_id, msg).await { + if let Err(err) = peer.conn.send(proto_id.to_string(), msg).await { error!("failed to send msg to {pid}: {err}"); continue; } } } - /// Add a new peer to the peer list. - pub async fn new_peer( - self: &Arc, - conn: Conn, - conn_direction: &ConnDirection, - disconnect_signal: Sender>, - ) -> Result<()> { - let endpoint = conn.peer_endpoint()?; - - // Do a handshake with the connection before creating a new peer. - let pid = self.do_handshake(&conn, conn_direction).await?; - - // TODO: Consider restricting the subnet for inbound connections - if self.contains_peer(&pid).await { - return Err(Error::PeerAlreadyConnected); - } - - // Create a new peer - let peer = Peer::new( - Arc::downgrade(self), - &pid, - conn, - endpoint.clone(), - conn_direction.clone(), - self.executor.clone(), - ); - - // Insert the new peer - self.peers.write().await.insert(pid.clone(), peer.clone()); - - let on_disconnect = { - let this = self.clone(); - let pid = pid.clone(); - |result| async move { - if let TaskResult::Completed(result) = result { - if let Err(err) = this.remove_peer(&pid).await { - error!("Failed to remove peer {pid}: {err}"); - } - let _ = disconnect_signal.send(result).await; - } - } - }; - - self.task_group.spawn(peer.run(), on_disconnect); - - info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); - - self.monitor.notify(PPEvent::NewPeer(pid.clone())).await; - - Ok(()) - } - /// Checks if the peer list contains a peer with the given peer id pub async fn contains_peer(&self, pid: &PeerID) -> bool { self.peers.read().await.contains_key(pid) @@ -204,162 +141,89 @@ impl PeerPool { peers } - /// Listens to a new connection from the connection queue - async fn listen_loop(self: Arc) { + async fn run(self: Arc) { loop { - let conn = self.conn_queue.next().await; - let signal = conn.disconnect_signal; + let mut conn = self.conn_queue.next().await; + + for protocol_id in self.protocols.read().await.keys() { + conn.register_protocol(protocol_id.to_string()).await; + } - let result = self - .new_peer(conn.conn, &conn.direction, signal.clone()) - .await; + let conn = Arc::new(conn); - // Only send a disconnect signal if there is an error when adding a peer. + let result = self.new_peer(conn.clone()).await; + + // Disconnect if there is an error when adding a peer. if result.is_err() { - let _ = signal.send(result).await; + let _ = conn.disconnect(result).await; } } } - /// Shuts down the peer and remove it from the peer list. - async fn remove_peer(&self, pid: &PeerID) -> Result<()> { - let result = self.peers.write().await.remove(pid); - - let peer = match result { - Some(p) => p, - None => return Ok(()), - }; - - peer.shutdown().await; - - self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await; - - let endpoint = peer.remote_endpoint(); - let direction = peer.direction(); + /// Add a new peer to the peer list. + async fn new_peer(self: &Arc, conn: Arc) -> Result<()> { + // Create a new peer + let peer = Peer::new( + self.id.clone(), + Arc::downgrade(self), + conn.clone(), + self.config.clone(), + self.executor.clone(), + ); + peer.init().await?; + let pid = peer.id().expect("Get peer id after peer initialization"); - warn!("Peer {pid} removed, direction: {direction}, endpoint: {endpoint}",); - Ok(()) - } + // TODO: Consider restricting the subnet for inbound connections + if self.contains_peer(&pid).await { + return Err(Error::PeerAlreadyConnected); + } - /// Attach the core protocols. - async fn setup_protocols(&self) -> Result<()> { - let executor = self.executor.clone(); - let c = move |peer| PingProtocol::new(peer, executor.clone()); - self.attach_protocol::(Box::new(c)).await - } + // Insert the new peer + self.peers.write().await.insert(pid.clone(), peer.clone()); - /// Initiate a handshake with a connection. - async fn do_handshake( - &self, - conn: &Conn, - conn_direction: &ConnDirection, - ) -> Result { - trace!("Handshake started: {}", conn.peer_endpoint()?); - match conn_direction { - ConnDirection::Inbound => { - let result = self.wait_vermsg(conn).await; - match result { - Ok(_) => { - self.send_verack(conn, true).await?; - } - Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => { - self.send_verack(conn, false).await?; + let on_disconnect = { + let this = self.clone(); + let pid = pid.clone(); + |result| async move { + if let TaskResult::Completed(_) = result { + if let Err(err) = this.remove_peer(&pid).await { + error!("Failed to remove peer {pid}: {err}"); } - _ => {} } - result - } - - ConnDirection::Outbound => { - self.send_vermsg(conn).await?; - self.wait_verack(conn).await } - } - } + }; - /// Send a Version message - async fn send_vermsg(&self, conn: &Conn) -> Result<()> { - let pids = self.protocol_versions.read().await; - let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect(); - drop(pids); + self.task_group.spawn(peer.run(), on_disconnect); - let vermsg = VerMsg { - peer_id: self.id.clone(), - protocols, - version: self.config.version.v.clone(), - }; + info!("Add new peer {pid}"); + self.monitor.notify(PPEvent::NewPeer(pid)).await; - trace!("Send VerMsg"); - conn.send(NetMsg::new(NetMsgCmd::Version, &vermsg)?).await?; Ok(()) } - /// Wait for a Version message - /// - /// Returns the peer's ID upon successfully receiving the Version message. - async fn wait_vermsg(&self, conn: &Conn) -> Result { - let t = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = timeout(t, conn.recv()).await??; - - let payload = get_msg_payload!(Version, msg); - let (vermsg, _) = decode::(&payload)?; - - if !version_match(&self.config.version.req, &vermsg.version) { - return Err(Error::IncompatibleVersion("system: {}".into())); - } - - self.protocols_match(&vermsg.protocols).await?; - - trace!("Received VerMsg from: {}", vermsg.peer_id); - Ok(vermsg.peer_id) - } + /// Shuts down the peer and remove it from the peer list. + async fn remove_peer(&self, pid: &PeerID) -> Result<()> { + let result = self.peers.write().await.remove(pid); - /// Send a Verack message - async fn send_verack(&self, conn: &Conn, ack: bool) -> Result<()> { - let verack = VerAckMsg { - peer_id: self.id.clone(), - ack, + let peer = match result { + Some(p) => p, + None => return Ok(()), }; - trace!("Send VerAckMsg {:?}", verack); - conn.send(NetMsg::new(NetMsgCmd::Verack, &verack)?).await?; - Ok(()) - } - - /// Wait for a Verack message - /// - /// Returns the peer's ID upon successfully receiving the Verack message. - async fn wait_verack(&self, conn: &Conn) -> Result { - let t = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = timeout(t, conn.recv()).await??; + let _ = peer.shutdown().await; - let payload = get_msg_payload!(Verack, msg); - let (verack, _) = decode::(&payload)?; - - if !verack.ack { - return Err(Error::IncompatiblePeer); - } + self.monitor.notify(PPEvent::RemovePeer(pid.clone())).await; - trace!("Received VerAckMsg from: {}", verack.peer_id); - Ok(verack.peer_id) + warn!("Peer {pid} removed",); + Ok(()) } - /// Check if the new connection has compatible protocols. - async fn protocols_match(&self, protocols: &HashMap) -> Result<()> { - for (n, pv) in protocols.iter() { - let pids = self.protocol_versions.read().await; - - match pids.get(n) { - Some(v) => { - if !version_match(&v.req, pv) { - return Err(Error::IncompatibleVersion(format!("{n} protocol: {pv}"))); - } - } - None => { - return Err(Error::UnsupportedProtocol(n.to_string())); - } - } - } - Ok(()) + /// Attach the core protocols. + async fn setup_core_protocols(&self) -> Result<()> { + let executor = self.executor.clone(); + let ping_interval = self.config.ping_interval; + let ping_timeout = self.config.ping_timeout; + let c = move |peer| PingProtocol::new(peer, ping_interval, ping_timeout, executor.clone()); + self.attach_protocol::(Box::new(c)).await } } -- cgit v1.2.3