aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/conn_queue.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
commite15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch)
tree7976f6993e4f6b3646f5bd6954189346d5ffd330 /p2p/src/conn_queue.rs
parent6c65232d741229635151671708556b9af7ef75ac (diff)
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'p2p/src/conn_queue.rs')
-rw-r--r--p2p/src/conn_queue.rs53
1 files changed, 16 insertions, 37 deletions
diff --git a/p2p/src/conn_queue.rs b/p2p/src/conn_queue.rs
index 9a153f3..1b6ef98 100644
--- a/p2p/src/conn_queue.rs
+++ b/p2p/src/conn_queue.rs
@@ -1,37 +1,13 @@
-use std::{collections::VecDeque, fmt, sync::Arc};
-
-use async_channel::Sender;
+use std::{collections::VecDeque, sync::Arc};
use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
use karyon_net::Conn;
-use crate::{message::NetMsg, Result};
-
-/// Defines the direction of a network connection.
-#[derive(Clone, Debug)]
-pub enum ConnDirection {
- Inbound,
- Outbound,
-}
-
-impl fmt::Display for ConnDirection {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- ConnDirection::Inbound => write!(f, "Inbound"),
- ConnDirection::Outbound => write!(f, "Outbound"),
- }
- }
-}
-
-pub struct NewConn {
- pub direction: ConnDirection,
- pub conn: Conn<NetMsg>,
- pub disconnect_signal: Sender<Result<()>>,
-}
+use crate::{connection::ConnDirection, connection::Connection, message::NetMsg, Result};
/// Connection queue
pub struct ConnQueue {
- queue: Mutex<VecDeque<NewConn>>,
+ queue: Mutex<VecDeque<Connection>>,
conn_available: CondVar,
}
@@ -43,24 +19,27 @@ impl ConnQueue {
})
}
- /// Push a connection into the queue and wait for the disconnect signal
+ /// Handle a connection by pushing it into the queue and wait for the disconnect signal
pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> {
- let (disconnect_signal, chan) = async_channel::bounded(1);
- let new_conn = NewConn {
- direction,
- conn,
- disconnect_signal,
- };
+ let endpoint = conn.peer_endpoint()?;
+
+ let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
+ let new_conn = Connection::new(conn, disconnect_tx, direction, endpoint);
+
+ // Push a new conn to the queue
self.queue.lock().await.push_back(new_conn);
self.conn_available.signal();
- if let Ok(result) = chan.recv().await {
+
+ // Wait for the disconnect signal from the connection handler
+ if let Ok(result) = disconnect_rx.recv().await {
return result;
}
+
Ok(())
}
- /// Receive the next connection in the queue
- pub async fn next(&self) -> NewConn {
+ /// Waits for the next connection in the queue
+ pub async fn next(&self) -> Connection {
let mut queue = self.queue.lock().await;
while queue.is_empty() {
queue = self.conn_available.wait(queue).await;