aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
commite15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch)
tree7976f6993e4f6b3646f5bd6954189346d5ffd330 /p2p/src/peer/mod.rs
parent6c65232d741229635151671708556b9af7ef75ac (diff)
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'p2p/src/peer/mod.rs')
-rw-r--r--p2p/src/peer/mod.rs247
1 files changed, 116 insertions, 131 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 6903294..a5ac7ad 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -1,138 +1,111 @@
mod peer_id;
-pub use peer_id::PeerID;
-
use std::sync::{Arc, Weak};
use async_channel::{Receiver, Sender};
-use bincode::{Decode, Encode};
+use bincode::Encode;
use log::{error, trace};
+use parking_lot::RwLock;
use karyon_core::{
- async_runtime::{lock::RwLock, Executor},
+ async_runtime::Executor,
async_util::{select, Either, TaskGroup, TaskResult},
- event::{EventListener, EventSys},
- util::{decode, encode},
+ util::decode,
};
-use karyon_net::{Conn, Endpoint};
-
use crate::{
- conn_queue::ConnDirection,
- message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
+ connection::{ConnDirection, Connection},
+ endpoint::Endpoint,
+ message::{NetMsgCmd, ProtocolMsg},
peer_pool::PeerPool,
- protocol::{Protocol, ProtocolEvent, ProtocolID},
+ protocol::{InitProtocol, Protocol, ProtocolEvent, ProtocolID},
+ protocols::HandshakeProtocol,
Config, Error, Result,
};
+pub use peer_id::PeerID;
+
pub struct Peer {
+ /// Own ID
+ own_id: PeerID,
+
/// Peer's ID
- id: PeerID,
+ id: RwLock<Option<PeerID>>,
- /// A weak pointer to `PeerPool`
+ /// A weak pointer to [`PeerPool`]
peer_pool: Weak<PeerPool>,
/// Holds the peer connection
- conn: Conn<NetMsg>,
-
- /// Remote endpoint for the peer
- remote_endpoint: Endpoint,
-
- /// The direction of the connection, either `Inbound` or `Outbound`
- conn_direction: ConnDirection,
-
- /// A list of protocol IDs
- protocol_ids: RwLock<Vec<ProtocolID>>,
-
- /// `EventSys` responsible for sending events to the protocols.
- protocol_events: Arc<EventSys<ProtocolID>>,
+ pub(crate) conn: Arc<Connection>,
/// This channel is used to send a stop signal to the read loop.
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
+ /// The Configuration for the P2P network.
+ config: Arc<Config>,
+
/// Managing spawned tasks.
task_group: TaskGroup,
}
impl Peer {
/// Creates a new peer
- pub fn new(
+ pub(crate) fn new(
+ own_id: PeerID,
peer_pool: Weak<PeerPool>,
- id: &PeerID,
- conn: Conn<NetMsg>,
- remote_endpoint: Endpoint,
- conn_direction: ConnDirection,
+ conn: Arc<Connection>,
+ config: Arc<Config>,
ex: Executor,
) -> Arc<Peer> {
Arc::new(Peer {
- id: id.clone(),
+ own_id,
+ id: RwLock::new(None),
peer_pool,
conn,
- protocol_ids: RwLock::new(Vec::new()),
- remote_endpoint,
- conn_direction,
- protocol_events: EventSys::new(),
+ config,
task_group: TaskGroup::with_executor(ex),
stop_chan: async_channel::bounded(1),
})
}
- /// Run the peer
- pub async fn run(self: Arc<Self>) -> Result<()> {
- self.start_protocols().await;
- self.read_loop().await
+ /// Send a msg to this peer connection using the specified protocol.
+ pub async fn send<T: Encode>(&self, proto_id: ProtocolID, msg: T) -> Result<()> {
+ self.conn.send(proto_id, msg).await
}
- /// Send a message to the peer connection using the specified protocol.
- pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {
- let payload = encode(msg)?;
-
- let proto_msg = ProtocolMsg {
- protocol_id: protocol_id.to_string(),
- payload: payload.to_vec(),
- };
-
- self.conn
- .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?)
- .await?;
- Ok(())
+ /// Receives a new msg from this peer connection.
+ pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
+ self.conn.recv::<P>().await
}
/// Broadcast a message to all connected peers using the specified protocol.
- pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) {
- self.peer_pool().broadcast(protocol_id, msg).await;
+ pub async fn broadcast<T: Encode>(&self, proto_id: &ProtocolID, msg: &T) {
+ self.peer_pool().broadcast(proto_id, msg).await;
}
- /// Shuts down the peer
- pub async fn shutdown(&self) {
- trace!("peer {} start shutting down", self.id);
-
- // Send shutdown event to all protocols
- for protocol_id in self.protocol_ids.read().await.iter() {
- self.protocol_events
- .emit_by_topic(protocol_id, &ProtocolEvent::Shutdown)
- .await;
- }
+ /// Returns the peer's ID
+ pub fn id(&self) -> Option<PeerID> {
+ self.id.read().clone()
+ }
- // Send a stop signal to the read loop
- //
- // No need to handle the error here; a dropped channel and
- // sending a stop signal have the same effect.
- let _ = self.stop_chan.0.try_send(Ok(()));
+ /// Returns own ID
+ pub fn own_id(&self) -> &PeerID {
+ &self.own_id
+ }
- // No need to handle the error here
- let shutdown_msg =
- NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0)).expect("pack shutdown message");
- let _ = self.conn.send(shutdown_msg).await;
+ /// Returns the [`Config`]
+ pub fn config(&self) -> Arc<Config> {
+ self.config.clone()
+ }
- // Force shutting down
- self.task_group.cancel().await;
+ /// Returns the remote endpoint for the peer
+ pub fn remote_endpoint(&self) -> &Endpoint {
+ &self.conn.remote_endpoint
}
/// Check if the connection is Inbound
- #[inline]
pub fn is_inbound(&self) -> bool {
- match self.conn_direction {
+ match self.conn.direction {
ConnDirection::Inbound => true,
ConnDirection::Outbound => false,
}
@@ -140,40 +113,82 @@ impl Peer {
/// Returns the direction of the connection, which can be either `Inbound`
/// or `Outbound`.
- #[inline]
pub fn direction(&self) -> &ConnDirection {
- &self.conn_direction
+ &self.conn.direction
}
- /// Returns the remote endpoint for the peer
- #[inline]
- pub fn remote_endpoint(&self) -> &Endpoint {
- &self.remote_endpoint
+ pub(crate) async fn init(self: &Arc<Self>) -> Result<()> {
+ let handshake_protocol = HandshakeProtocol::new(
+ self.clone(),
+ self.peer_pool().protocol_versions.read().await.clone(),
+ );
+
+ let pid = handshake_protocol.init().await?;
+ *self.id.write() = Some(pid);
+
+ Ok(())
}
- /// Return the peer's ID
- #[inline]
- pub fn id(&self) -> &PeerID {
- &self.id
+ /// Run the peer
+ pub(crate) async fn run(self: Arc<Self>) -> Result<()> {
+ self.run_connect_protocols().await;
+ self.read_loop().await
}
- /// Returns the `Config` instance.
- pub fn config(&self) -> Arc<Config> {
- self.peer_pool().config.clone()
+ /// Shuts down the peer
+ pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<()> {
+ trace!("peer {:?} shutting down", self.id());
+
+ // Send shutdown event to the attached protocols
+ for proto_id in self.peer_pool().protocols.read().await.keys() {
+ let _ = self.conn.emit_msg(proto_id, &ProtocolEvent::Shutdown).await;
+ }
+
+ // Send a stop signal to the read loop
+ //
+ // No need to handle the error here; a dropped channel and
+ // sendig a stop signal have the same effect.
+ let _ = self.stop_chan.0.try_send(Ok(()));
+
+ self.conn.disconnect(Ok(())).await?;
+
+ // Force shutting down
+ self.task_group.cancel().await;
+ Ok(())
}
- /// Registers a listener for the given Protocol `P`.
- pub async fn register_listener<P: Protocol>(&self) -> EventListener<ProtocolID, ProtocolEvent> {
- self.protocol_events.register(&P::id()).await
+ /// Run running the Connect Protocols for this peer connection.
+ async fn run_connect_protocols(self: &Arc<Self>) {
+ for (proto_id, constructor) in self.peer_pool().protocols.read().await.iter() {
+ trace!("peer {:?} run protocol {proto_id}", self.id());
+
+ let protocol = constructor(self.clone());
+
+ let on_failure = {
+ let this = self.clone();
+ let proto_id = proto_id.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(res) = result {
+ if res.is_err() {
+ error!("protocol {} stopped", proto_id);
+ }
+ // Send a stop signal to read loop
+ let _ = this.stop_chan.0.try_send(res);
+ }
+ }
+ };
+
+ self.task_group.spawn(protocol.start(), on_failure);
+ }
}
- /// Start a read loop to handle incoming messages from the peer connection.
+ /// Run a read loop to handle incoming messages from the peer connection.
async fn read_loop(&self) -> Result<()> {
loop {
- let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await;
+ let fut = select(self.stop_chan.1.recv(), self.conn.recv_inner()).await;
let result = match fut {
Either::Left(stop_signal) => {
- trace!("Peer {} received a stop signal", self.id);
+ trace!("Peer {:?} received a stop signal", self.id());
return stop_signal?;
}
Either::Right(result) => result,
@@ -184,14 +199,9 @@ impl Peer {
match msg.header.command {
NetMsgCmd::Protocol => {
let msg: ProtocolMsg = decode(&msg.payload)?.0;
-
- if !self.protocol_ids.read().await.contains(&msg.protocol_id) {
- return Err(Error::UnsupportedProtocol(msg.protocol_id));
- }
-
- let proto_id = &msg.protocol_id;
- let msg = ProtocolEvent::Message(msg.payload);
- self.protocol_events.emit_by_topic(proto_id, &msg).await;
+ self.conn
+ .emit_msg(&msg.protocol_id, &ProtocolEvent::Message(msg.payload))
+ .await?;
}
NetMsgCmd::Shutdown => {
return Err(Error::PeerShutdown);
@@ -201,32 +211,7 @@ impl Peer {
}
}
- /// Start running the protocols for this peer connection.
- async fn start_protocols(self: &Arc<Self>) {
- for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() {
- trace!("peer {} start protocol {protocol_id}", self.id);
- let protocol = constructor(self.clone());
-
- self.protocol_ids.write().await.push(protocol_id.clone());
-
- let on_failure = {
- let this = self.clone();
- let protocol_id = protocol_id.clone();
- |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(res) = result {
- if res.is_err() {
- error!("protocol {} stopped", protocol_id);
- }
- // Send a stop signal to read loop
- let _ = this.stop_chan.0.try_send(res);
- }
- }
- };
-
- self.task_group.spawn(protocol.start(), on_failure);
- }
- }
-
+ /// Returns `PeerPool` pointer
fn peer_pool(&self) -> Arc<PeerPool> {
self.peer_pool.upgrade().unwrap()
}