diff options
Diffstat (limited to 'p2p/src/conn_queue.rs')
-rw-r--r-- | p2p/src/conn_queue.rs | 53 |
1 files changed, 16 insertions, 37 deletions
diff --git a/p2p/src/conn_queue.rs b/p2p/src/conn_queue.rs index 9a153f3..1b6ef98 100644 --- a/p2p/src/conn_queue.rs +++ b/p2p/src/conn_queue.rs @@ -1,37 +1,13 @@ -use std::{collections::VecDeque, fmt, sync::Arc}; - -use async_channel::Sender; +use std::{collections::VecDeque, sync::Arc}; use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; use karyon_net::Conn; -use crate::{message::NetMsg, Result}; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { - Inbound, - Outbound, -} - -impl fmt::Display for ConnDirection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ConnDirection::Inbound => write!(f, "Inbound"), - ConnDirection::Outbound => write!(f, "Outbound"), - } - } -} - -pub struct NewConn { - pub direction: ConnDirection, - pub conn: Conn<NetMsg>, - pub disconnect_signal: Sender<Result<()>>, -} +use crate::{connection::ConnDirection, connection::Connection, message::NetMsg, Result}; /// Connection queue pub struct ConnQueue { - queue: Mutex<VecDeque<NewConn>>, + queue: Mutex<VecDeque<Connection>>, conn_available: CondVar, } @@ -43,24 +19,27 @@ impl ConnQueue { }) } - /// Push a connection into the queue and wait for the disconnect signal + /// Handle a connection by pushing it into the queue and wait for the disconnect signal pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> { - let (disconnect_signal, chan) = async_channel::bounded(1); - let new_conn = NewConn { - direction, - conn, - disconnect_signal, - }; + let endpoint = conn.peer_endpoint()?; + + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + let new_conn = Connection::new(conn, disconnect_tx, direction, endpoint); + + // Push a new conn to the queue self.queue.lock().await.push_back(new_conn); self.conn_available.signal(); - if let Ok(result) = chan.recv().await { + + // Wait for the disconnect signal from the connection handler + if let Ok(result) = disconnect_rx.recv().await { return result; } + Ok(()) } - /// Receive the next connection in the queue - pub async fn next(&self) -> NewConn { + /// Waits for the next connection in the queue + pub async fn next(&self) -> Connection { let mut queue = self.queue.lock().await; while queue.is_empty() { queue = self.conn_available.wait(queue).await; |