From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- p2p/src/conn_queue.rs | 53 ++++++++++++++++----------------------------------- 1 file changed, 16 insertions(+), 37 deletions(-) (limited to 'p2p/src/conn_queue.rs') diff --git a/p2p/src/conn_queue.rs b/p2p/src/conn_queue.rs index 9a153f3..1b6ef98 100644 --- a/p2p/src/conn_queue.rs +++ b/p2p/src/conn_queue.rs @@ -1,37 +1,13 @@ -use std::{collections::VecDeque, fmt, sync::Arc}; - -use async_channel::Sender; +use std::{collections::VecDeque, sync::Arc}; use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; use karyon_net::Conn; -use crate::{message::NetMsg, Result}; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { - Inbound, - Outbound, -} - -impl fmt::Display for ConnDirection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ConnDirection::Inbound => write!(f, "Inbound"), - ConnDirection::Outbound => write!(f, "Outbound"), - } - } -} - -pub struct NewConn { - pub direction: ConnDirection, - pub conn: Conn, - pub disconnect_signal: Sender>, -} +use crate::{connection::ConnDirection, connection::Connection, message::NetMsg, Result}; /// Connection queue pub struct ConnQueue { - queue: Mutex>, + queue: Mutex>, conn_available: CondVar, } @@ -43,24 +19,27 @@ impl ConnQueue { }) } - /// Push a connection into the queue and wait for the disconnect signal + /// Handle a connection by pushing it into the queue and wait for the disconnect signal pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> { - let (disconnect_signal, chan) = async_channel::bounded(1); - let new_conn = NewConn { - direction, - conn, - disconnect_signal, - }; + let endpoint = conn.peer_endpoint()?; + + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + let new_conn = Connection::new(conn, disconnect_tx, direction, endpoint); + + // Push a new conn to the queue self.queue.lock().await.push_back(new_conn); self.conn_available.signal(); - if let Ok(result) = chan.recv().await { + + // Wait for the disconnect signal from the connection handler + if let Ok(result) = disconnect_rx.recv().await { return result; } + Ok(()) } - /// Receive the next connection in the queue - pub async fn next(&self) -> NewConn { + /// Waits for the next connection in the queue + pub async fn next(&self) -> Connection { let mut queue = self.queue.lock().await; while queue.is_empty() { queue = self.conn_available.wait(queue).await; -- cgit v1.2.3