aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/net/connection_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/net/connection_queue.rs')
-rw-r--r--p2p/src/net/connection_queue.rs52
1 files changed, 52 insertions, 0 deletions
diff --git a/p2p/src/net/connection_queue.rs b/p2p/src/net/connection_queue.rs
new file mode 100644
index 0000000..fbc4bfc
--- /dev/null
+++ b/p2p/src/net/connection_queue.rs
@@ -0,0 +1,52 @@
+use std::sync::Arc;
+
+use smol::{channel::Sender, lock::Mutex};
+
+use karyons_core::async_utils::CondVar;
+
+use karyons_net::Conn;
+
+use crate::net::ConnDirection;
+
+pub struct NewConn {
+ pub direction: ConnDirection,
+ pub conn: Conn,
+ pub disconnect_signal: Sender<()>,
+}
+
+/// Connection queue
+pub struct ConnQueue {
+ queue: Mutex<Vec<NewConn>>,
+ conn_available: CondVar,
+}
+
+impl ConnQueue {
+ pub fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(Vec::new()),
+ conn_available: CondVar::new(),
+ })
+ }
+
+ /// Push a connection into the queue and wait for the disconnect signal
+ pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
+ let (disconnect_signal, chan) = smol::channel::bounded(1);
+ let new_conn = NewConn {
+ direction,
+ conn,
+ disconnect_signal,
+ };
+ self.queue.lock().await.push(new_conn);
+ self.conn_available.signal();
+ let _ = chan.recv().await;
+ }
+
+ /// Receive the next connection in the queue
+ pub async fn next(&self) -> NewConn {
+ let mut queue = self.queue.lock().await;
+ while queue.is_empty() {
+ queue = self.conn_available.wait(queue).await;
+ }
+ queue.pop().unwrap()
+ }
+}