aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/connection.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-25 07:15:02 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-25 07:15:02 +0200
commite022b2b77e3e98f0f444f637f96e74e6a6f990cf (patch)
tree1d599938f8d97eaa9a6cbdc8ff83eefeae966201 /p2p/src/connection.rs
parent998568ab76cc8ba36fe47d5fca17bcc997aa391c (diff)
p2p: rename connection.rs to conn_queue.rs
Diffstat (limited to 'p2p/src/connection.rs')
-rw-r--r--p2p/src/connection.rs70
1 files changed, 0 insertions, 70 deletions
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
deleted file mode 100644
index 9a153f3..0000000
--- a/p2p/src/connection.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use std::{collections::VecDeque, fmt, sync::Arc};
-
-use async_channel::Sender;
-
-use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
-use karyon_net::Conn;
-
-use crate::{message::NetMsg, Result};
-
-/// Defines the direction of a network connection.
-#[derive(Clone, Debug)]
-pub enum ConnDirection {
- Inbound,
- Outbound,
-}
-
-impl fmt::Display for ConnDirection {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- ConnDirection::Inbound => write!(f, "Inbound"),
- ConnDirection::Outbound => write!(f, "Outbound"),
- }
- }
-}
-
-pub struct NewConn {
- pub direction: ConnDirection,
- pub conn: Conn<NetMsg>,
- pub disconnect_signal: Sender<Result<()>>,
-}
-
-/// Connection queue
-pub struct ConnQueue {
- queue: Mutex<VecDeque<NewConn>>,
- conn_available: CondVar,
-}
-
-impl ConnQueue {
- pub fn new() -> Arc<Self> {
- Arc::new(Self {
- queue: Mutex::new(VecDeque::new()),
- conn_available: CondVar::new(),
- })
- }
-
- /// Push a connection into the queue and wait for the disconnect signal
- pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> {
- let (disconnect_signal, chan) = async_channel::bounded(1);
- let new_conn = NewConn {
- direction,
- conn,
- disconnect_signal,
- };
- self.queue.lock().await.push_back(new_conn);
- self.conn_available.signal();
- if let Ok(result) = chan.recv().await {
- return result;
- }
- Ok(())
- }
-
- /// Receive the next connection in the queue
- pub async fn next(&self) -> NewConn {
- let mut queue = self.queue.lock().await;
- while queue.is_empty() {
- queue = self.conn_available.wait(queue).await;
- }
- queue.pop_front().unwrap()
- }
-}