diff options
Diffstat (limited to 'p2p/src/conn_queue.rs')
| -rw-r--r-- | p2p/src/conn_queue.rs | 53 | 
1 files changed, 16 insertions, 37 deletions
diff --git a/p2p/src/conn_queue.rs b/p2p/src/conn_queue.rs index 9a153f3..1b6ef98 100644 --- a/p2p/src/conn_queue.rs +++ b/p2p/src/conn_queue.rs @@ -1,37 +1,13 @@ -use std::{collections::VecDeque, fmt, sync::Arc}; - -use async_channel::Sender; +use std::{collections::VecDeque, sync::Arc};  use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};  use karyon_net::Conn; -use crate::{message::NetMsg, Result}; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { -    Inbound, -    Outbound, -} - -impl fmt::Display for ConnDirection { -    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { -        match self { -            ConnDirection::Inbound => write!(f, "Inbound"), -            ConnDirection::Outbound => write!(f, "Outbound"), -        } -    } -} - -pub struct NewConn { -    pub direction: ConnDirection, -    pub conn: Conn<NetMsg>, -    pub disconnect_signal: Sender<Result<()>>, -} +use crate::{connection::ConnDirection, connection::Connection, message::NetMsg, Result};  /// Connection queue  pub struct ConnQueue { -    queue: Mutex<VecDeque<NewConn>>, +    queue: Mutex<VecDeque<Connection>>,      conn_available: CondVar,  } @@ -43,24 +19,27 @@ impl ConnQueue {          })      } -    /// Push a connection into the queue and wait for the disconnect signal +    /// Handle a connection by pushing it into the queue and wait for the disconnect signal      pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> { -        let (disconnect_signal, chan) = async_channel::bounded(1); -        let new_conn = NewConn { -            direction, -            conn, -            disconnect_signal, -        }; +        let endpoint = conn.peer_endpoint()?; + +        let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); +        let new_conn = Connection::new(conn, disconnect_tx, direction, endpoint); + +        // Push a new conn to the queue          self.queue.lock().await.push_back(new_conn);          self.conn_available.signal(); -        if let Ok(result) = chan.recv().await { + +        // Wait for the disconnect signal from the connection handler +        if let Ok(result) = disconnect_rx.recv().await {              return result;          } +          Ok(())      } -    /// Receive the next connection in the queue -    pub async fn next(&self) -> NewConn { +    /// Waits for the next connection in the queue +    pub async fn next(&self) -> Connection {          let mut queue = self.queue.lock().await;          while queue.is_empty() {              queue = self.conn_available.wait(queue).await;  | 
