From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- p2p/src/peer/mod.rs | 247 ++++++++++++++++++++++++---------------------------- 1 file changed, 116 insertions(+), 131 deletions(-) (limited to 'p2p/src/peer') diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 6903294..a5ac7ad 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -1,138 +1,111 @@ mod peer_id; -pub use peer_id::PeerID; - use std::sync::{Arc, Weak}; use async_channel::{Receiver, Sender}; -use bincode::{Decode, Encode}; +use bincode::Encode; use log::{error, trace}; +use parking_lot::RwLock; use karyon_core::{ - async_runtime::{lock::RwLock, Executor}, + async_runtime::Executor, async_util::{select, Either, TaskGroup, TaskResult}, - event::{EventListener, EventSys}, - util::{decode, encode}, + util::decode, }; -use karyon_net::{Conn, Endpoint}; - use crate::{ - conn_queue::ConnDirection, - message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg}, + connection::{ConnDirection, Connection}, + endpoint::Endpoint, + message::{NetMsgCmd, ProtocolMsg}, peer_pool::PeerPool, - protocol::{Protocol, ProtocolEvent, ProtocolID}, + protocol::{InitProtocol, Protocol, ProtocolEvent, ProtocolID}, + protocols::HandshakeProtocol, Config, Error, Result, }; +pub use peer_id::PeerID; + pub struct Peer { + /// Own ID + own_id: PeerID, + /// Peer's ID - id: PeerID, + id: RwLock>, - /// A weak pointer to `PeerPool` + /// A weak pointer to [`PeerPool`] peer_pool: Weak, /// Holds the peer connection - conn: Conn, - - /// Remote endpoint for the peer - remote_endpoint: Endpoint, - - /// The direction of the connection, either `Inbound` or `Outbound` - conn_direction: ConnDirection, - - /// A list of protocol IDs - protocol_ids: RwLock>, - - /// `EventSys` responsible for sending events to the protocols. - protocol_events: Arc>, + pub(crate) conn: Arc, /// This channel is used to send a stop signal to the read loop. stop_chan: (Sender>, Receiver>), + /// The Configuration for the P2P network. + config: Arc, + /// Managing spawned tasks. task_group: TaskGroup, } impl Peer { /// Creates a new peer - pub fn new( + pub(crate) fn new( + own_id: PeerID, peer_pool: Weak, - id: &PeerID, - conn: Conn, - remote_endpoint: Endpoint, - conn_direction: ConnDirection, + conn: Arc, + config: Arc, ex: Executor, ) -> Arc { Arc::new(Peer { - id: id.clone(), + own_id, + id: RwLock::new(None), peer_pool, conn, - protocol_ids: RwLock::new(Vec::new()), - remote_endpoint, - conn_direction, - protocol_events: EventSys::new(), + config, task_group: TaskGroup::with_executor(ex), stop_chan: async_channel::bounded(1), }) } - /// Run the peer - pub async fn run(self: Arc) -> Result<()> { - self.start_protocols().await; - self.read_loop().await + /// Send a msg to this peer connection using the specified protocol. + pub async fn send(&self, proto_id: ProtocolID, msg: T) -> Result<()> { + self.conn.send(proto_id, msg).await } - /// Send a message to the peer connection using the specified protocol. - pub async fn send(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { - let payload = encode(msg)?; - - let proto_msg = ProtocolMsg { - protocol_id: protocol_id.to_string(), - payload: payload.to_vec(), - }; - - self.conn - .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?) - .await?; - Ok(()) + /// Receives a new msg from this peer connection. + pub async fn recv(&self) -> Result { + self.conn.recv::

().await } /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast(&self, protocol_id: &ProtocolID, msg: &T) { - self.peer_pool().broadcast(protocol_id, msg).await; + pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { + self.peer_pool().broadcast(proto_id, msg).await; } - /// Shuts down the peer - pub async fn shutdown(&self) { - trace!("peer {} start shutting down", self.id); - - // Send shutdown event to all protocols - for protocol_id in self.protocol_ids.read().await.iter() { - self.protocol_events - .emit_by_topic(protocol_id, &ProtocolEvent::Shutdown) - .await; - } + /// Returns the peer's ID + pub fn id(&self) -> Option { + self.id.read().clone() + } - // Send a stop signal to the read loop - // - // No need to handle the error here; a dropped channel and - // sending a stop signal have the same effect. - let _ = self.stop_chan.0.try_send(Ok(())); + /// Returns own ID + pub fn own_id(&self) -> &PeerID { + &self.own_id + } - // No need to handle the error here - let shutdown_msg = - NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0)).expect("pack shutdown message"); - let _ = self.conn.send(shutdown_msg).await; + /// Returns the [`Config`] + pub fn config(&self) -> Arc { + self.config.clone() + } - // Force shutting down - self.task_group.cancel().await; + /// Returns the remote endpoint for the peer + pub fn remote_endpoint(&self) -> &Endpoint { + &self.conn.remote_endpoint } /// Check if the connection is Inbound - #[inline] pub fn is_inbound(&self) -> bool { - match self.conn_direction { + match self.conn.direction { ConnDirection::Inbound => true, ConnDirection::Outbound => false, } @@ -140,40 +113,82 @@ impl Peer { /// Returns the direction of the connection, which can be either `Inbound` /// or `Outbound`. - #[inline] pub fn direction(&self) -> &ConnDirection { - &self.conn_direction + &self.conn.direction } - /// Returns the remote endpoint for the peer - #[inline] - pub fn remote_endpoint(&self) -> &Endpoint { - &self.remote_endpoint + pub(crate) async fn init(self: &Arc) -> Result<()> { + let handshake_protocol = HandshakeProtocol::new( + self.clone(), + self.peer_pool().protocol_versions.read().await.clone(), + ); + + let pid = handshake_protocol.init().await?; + *self.id.write() = Some(pid); + + Ok(()) } - /// Return the peer's ID - #[inline] - pub fn id(&self) -> &PeerID { - &self.id + /// Run the peer + pub(crate) async fn run(self: Arc) -> Result<()> { + self.run_connect_protocols().await; + self.read_loop().await } - /// Returns the `Config` instance. - pub fn config(&self) -> Arc { - self.peer_pool().config.clone() + /// Shuts down the peer + pub(crate) async fn shutdown(self: &Arc) -> Result<()> { + trace!("peer {:?} shutting down", self.id()); + + // Send shutdown event to the attached protocols + for proto_id in self.peer_pool().protocols.read().await.keys() { + let _ = self.conn.emit_msg(proto_id, &ProtocolEvent::Shutdown).await; + } + + // Send a stop signal to the read loop + // + // No need to handle the error here; a dropped channel and + // sendig a stop signal have the same effect. + let _ = self.stop_chan.0.try_send(Ok(())); + + self.conn.disconnect(Ok(())).await?; + + // Force shutting down + self.task_group.cancel().await; + Ok(()) } - /// Registers a listener for the given Protocol `P`. - pub async fn register_listener(&self) -> EventListener { - self.protocol_events.register(&P::id()).await + /// Run running the Connect Protocols for this peer connection. + async fn run_connect_protocols(self: &Arc) { + for (proto_id, constructor) in self.peer_pool().protocols.read().await.iter() { + trace!("peer {:?} run protocol {proto_id}", self.id()); + + let protocol = constructor(self.clone()); + + let on_failure = { + let this = self.clone(); + let proto_id = proto_id.clone(); + |result: TaskResult>| async move { + if let TaskResult::Completed(res) = result { + if res.is_err() { + error!("protocol {} stopped", proto_id); + } + // Send a stop signal to read loop + let _ = this.stop_chan.0.try_send(res); + } + } + }; + + self.task_group.spawn(protocol.start(), on_failure); + } } - /// Start a read loop to handle incoming messages from the peer connection. + /// Run a read loop to handle incoming messages from the peer connection. async fn read_loop(&self) -> Result<()> { loop { - let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await; + let fut = select(self.stop_chan.1.recv(), self.conn.recv_inner()).await; let result = match fut { Either::Left(stop_signal) => { - trace!("Peer {} received a stop signal", self.id); + trace!("Peer {:?} received a stop signal", self.id()); return stop_signal?; } Either::Right(result) => result, @@ -184,14 +199,9 @@ impl Peer { match msg.header.command { NetMsgCmd::Protocol => { let msg: ProtocolMsg = decode(&msg.payload)?.0; - - if !self.protocol_ids.read().await.contains(&msg.protocol_id) { - return Err(Error::UnsupportedProtocol(msg.protocol_id)); - } - - let proto_id = &msg.protocol_id; - let msg = ProtocolEvent::Message(msg.payload); - self.protocol_events.emit_by_topic(proto_id, &msg).await; + self.conn + .emit_msg(&msg.protocol_id, &ProtocolEvent::Message(msg.payload)) + .await?; } NetMsgCmd::Shutdown => { return Err(Error::PeerShutdown); @@ -201,32 +211,7 @@ impl Peer { } } - /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc) { - for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { - trace!("peer {} start protocol {protocol_id}", self.id); - let protocol = constructor(self.clone()); - - self.protocol_ids.write().await.push(protocol_id.clone()); - - let on_failure = { - let this = self.clone(); - let protocol_id = protocol_id.clone(); - |result: TaskResult>| async move { - if let TaskResult::Completed(res) = result { - if res.is_err() { - error!("protocol {} stopped", protocol_id); - } - // Send a stop signal to read loop - let _ = this.stop_chan.0.try_send(res); - } - } - }; - - self.task_group.spawn(protocol.start(), on_failure); - } - } - + /// Returns `PeerPool` pointer fn peer_pool(&self) -> Arc { self.peer_pool.upgrade().unwrap() } -- cgit v1.2.3