From 5d91ead06c62fd7c3cd846659b935012616ce5ae Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 12 Nov 2023 11:26:38 +0300 Subject: p2p: remove net directory --- p2p/src/backend.rs | 2 +- p2p/src/connection.rs | 65 ++++++++++++++++++ p2p/src/connector.rs | 124 +++++++++++++++++++++++++++++++++++ p2p/src/discovery/lookup.rs | 4 +- p2p/src/discovery/mod.rs | 6 +- p2p/src/lib.rs | 5 +- p2p/src/listener.rs | 141 +++++++++++++++++++++++++++++++++++++++ p2p/src/net/connection_queue.rs | 52 --------------- p2p/src/net/connector.rs | 125 ----------------------------------- p2p/src/net/listener.rs | 142 ---------------------------------------- p2p/src/net/mod.rs | 27 -------- p2p/src/net/slots.rs | 54 --------------- p2p/src/peer/mod.rs | 2 +- p2p/src/peer_pool.rs | 3 +- p2p/src/slots.rs | 54 +++++++++++++++ 15 files changed, 398 insertions(+), 408 deletions(-) create mode 100644 p2p/src/connection.rs create mode 100644 p2p/src/connector.rs create mode 100644 p2p/src/listener.rs delete mode 100644 p2p/src/net/connection_queue.rs delete mode 100644 p2p/src/net/connector.rs delete mode 100644 p2p/src/net/listener.rs delete mode 100644 p2p/src/net/mod.rs delete mode 100644 p2p/src/net/slots.rs create mode 100644 p2p/src/slots.rs (limited to 'p2p') diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 290e3e7..bb18f06 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -6,9 +6,9 @@ use karyons_core::{pubsub::Subscription, Executor}; use crate::{ config::Config, + connection::ConnQueue, discovery::{ArcDiscovery, Discovery}, monitor::{Monitor, MonitorEvent}, - net::ConnQueue, peer_pool::PeerPool, protocol::{ArcProtocol, Protocol}, ArcPeer, PeerID, Result, diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs new file mode 100644 index 0000000..7c81aa2 --- /dev/null +++ b/p2p/src/connection.rs @@ -0,0 +1,65 @@ +use smol::{channel::Sender, lock::Mutex}; +use std::{collections::VecDeque, fmt, sync::Arc}; + +use karyons_core::async_utils::CondVar; + +use karyons_net::Conn; + +/// Defines the direction of a network connection. +#[derive(Clone, Debug)] +pub enum ConnDirection { + Inbound, + Outbound, +} + +impl fmt::Display for ConnDirection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ConnDirection::Inbound => write!(f, "Inbound"), + ConnDirection::Outbound => write!(f, "Outbound"), + } + } +} + +pub struct NewConn { + pub direction: ConnDirection, + pub conn: Conn, + pub disconnect_signal: Sender<()>, +} + +/// Connection queue +pub struct ConnQueue { + queue: Mutex>, + conn_available: CondVar, +} + +impl ConnQueue { + pub fn new() -> Arc { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + conn_available: CondVar::new(), + }) + } + + /// Push a connection into the queue and wait for the disconnect signal + pub async fn handle(&self, conn: Conn, direction: ConnDirection) { + let (disconnect_signal, chan) = smol::channel::bounded(1); + let new_conn = NewConn { + direction, + conn, + disconnect_signal, + }; + self.queue.lock().await.push_back(new_conn); + self.conn_available.signal(); + let _ = chan.recv().await; + } + + /// Receive the next connection in the queue + pub async fn next(&self) -> NewConn { + let mut queue = self.queue.lock().await; + while queue.is_empty() { + queue = self.conn_available.wait(queue).await; + } + queue.pop_front().unwrap() + } +} diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs new file mode 100644 index 0000000..3932c41 --- /dev/null +++ b/p2p/src/connector.rs @@ -0,0 +1,124 @@ +use std::{future::Future, sync::Arc}; + +use log::{trace, warn}; + +use karyons_core::{ + async_utils::{Backoff, TaskGroup, TaskResult}, + Executor, +}; +use karyons_net::{dial, Conn, Endpoint, NetError}; + +use crate::{ + monitor::{ConnEvent, Monitor}, + slots::ConnectionSlots, + Result, +}; + +/// Responsible for creating outbound connections with other peers. +pub struct Connector { + /// Managing spawned tasks. + task_group: TaskGroup, + + /// Manages available outbound slots. + connection_slots: Arc, + + /// The maximum number of retries allowed before successfully + /// establishing a connection. + max_retries: usize, + + /// Responsible for network and system monitoring. + monitor: Arc, +} + +impl Connector { + /// Creates a new Connector + pub fn new( + max_retries: usize, + connection_slots: Arc, + monitor: Arc, + ) -> Arc { + Arc::new(Self { + task_group: TaskGroup::new(), + monitor, + connection_slots, + max_retries, + }) + } + + /// Shuts down the connector + pub async fn shutdown(&self) { + self.task_group.cancel().await; + } + + /// Establish a connection to the specified `endpoint`. If the connection + /// attempt fails, it performs a backoff and retries until the maximum allowed + /// number of retries is exceeded. On a successful connection, it returns a + /// `Conn` instance. + /// + /// This method will block until it finds an available slot. + pub async fn connect(&self, endpoint: &Endpoint) -> Result { + self.connection_slots.wait_for_slot().await; + self.connection_slots.add(); + + let mut retry = 0; + let backoff = Backoff::new(500, 2000); + while retry < self.max_retries { + let conn_result = dial(endpoint).await; + + if let Ok(conn) = conn_result { + self.monitor + .notify(&ConnEvent::Connected(endpoint.clone()).into()) + .await; + return Ok(conn); + } + + self.monitor + .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into()) + .await; + + backoff.sleep().await; + + warn!("try to reconnect {endpoint}"); + retry += 1; + } + + self.monitor + .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into()) + .await; + + self.connection_slots.remove().await; + Err(NetError::Timeout.into()) + } + + /// Establish a connection to the given `endpoint`. For each new connection, + /// it invokes the provided `callback`, and pass the connection to the callback. + pub async fn connect_with_cback<'a, Fut>( + self: &Arc, + ex: Executor<'a>, + endpoint: &Endpoint, + callback: impl FnOnce(Conn) -> Fut + Send + 'a, + ) -> Result<()> + where + Fut: Future> + Send + 'a, + { + let conn = self.connect(endpoint).await?; + + let selfc = self.clone(); + let endpoint = endpoint.clone(); + let on_disconnect = |res| async move { + if let TaskResult::Completed(Err(err)) = res { + trace!("Outbound connection dropped: {err}"); + } + selfc + .monitor + .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) + .await; + selfc.connection_slots.remove().await; + }; + + self.task_group + .spawn(ex.clone(), callback(conn), on_disconnect); + + Ok(()) + } +} diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index f404133..94da900 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -10,14 +10,16 @@ use karyons_core::{async_utils::timeout, utils::decode, Executor}; use karyons_net::{Conn, Endpoint}; use crate::{ + connector::Connector, io_codec::IOCodec, + listener::Listener, message::{ get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, ShutdownMsg, }, monitor::{ConnEvent, DiscoveryEvent, Monitor}, - net::{ConnectionSlots, Connector, Listener}, routing_table::RoutingTable, + slots::ConnectionSlots, utils::version_match, Config, Error, PeerID, Result, }; diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 94b350b..7b8e7dc 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -16,13 +16,15 @@ use karyons_net::{Conn, Endpoint}; use crate::{ config::Config, + connection::{ConnDirection, ConnQueue}, + connector::Connector, + listener::Listener, monitor::Monitor, - net::ConnQueue, - net::{ConnDirection, ConnectionSlots, Connector, Listener}, routing_table::{ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, + slots::ConnectionSlots, Error, PeerID, Result, }; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 08ba059..3cf1e7c 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,14 +1,17 @@ mod backend; mod config; +mod connection; +mod connector; mod discovery; mod error; mod io_codec; +mod listener; mod message; -mod net; mod peer; mod peer_pool; mod protocols; mod routing_table; +mod slots; mod utils; /// Responsible for network and system monitoring. diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs new file mode 100644 index 0000000..ee92536 --- /dev/null +++ b/p2p/src/listener.rs @@ -0,0 +1,141 @@ +use std::{future::Future, sync::Arc}; + +use log::{error, info, trace}; + +use karyons_core::{ + async_utils::{TaskGroup, TaskResult}, + Executor, +}; + +use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; + +use crate::{ + monitor::{ConnEvent, Monitor}, + slots::ConnectionSlots, + Result, +}; + +/// Responsible for creating inbound connections with other peers. +pub struct Listener { + /// Managing spawned tasks. + task_group: TaskGroup, + + /// Manages available inbound slots. + connection_slots: Arc, + + /// Responsible for network and system monitoring. + monitor: Arc, +} + +impl Listener { + /// Creates a new Listener + pub fn new(connection_slots: Arc, monitor: Arc) -> Arc { + Arc::new(Self { + connection_slots, + task_group: TaskGroup::new(), + monitor, + }) + } + + /// Starts a listener on the given `endpoint`. For each incoming connection + /// that is accepted, it invokes the provided `callback`, and pass the + /// connection to the callback. + /// + /// Returns the resloved listening endpoint. + pub async fn start<'a, Fut>( + self: &Arc, + ex: Executor<'a>, + endpoint: Endpoint, + // https://github.com/rust-lang/rfcs/pull/2132 + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + ) -> Result + where + Fut: Future> + Send + 'a, + { + let listener = match listen(&endpoint).await { + Ok(listener) => { + self.monitor + .notify(&ConnEvent::Listening(endpoint.clone()).into()) + .await; + listener + } + Err(err) => { + error!("Failed to listen on {endpoint}: {err}"); + self.monitor + .notify(&ConnEvent::ListenFailed(endpoint).into()) + .await; + return Err(err.into()); + } + }; + + let resolved_endpoint = listener.local_endpoint()?; + + info!("Start listening on {endpoint}"); + + let selfc = self.clone(); + self.task_group.spawn( + ex.clone(), + selfc.listen_loop(ex.clone(), listener, callback), + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + error!("Listen loop stopped: {endpoint} {err}"); + } + }, + ); + Ok(resolved_endpoint) + } + + /// Shuts down the listener + pub async fn shutdown(&self) { + self.task_group.cancel().await; + } + + async fn listen_loop<'a, Fut>( + self: Arc, + ex: Executor<'a>, + listener: Box, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + ) -> Result<()> + where + Fut: Future> + Send + 'a, + { + loop { + // Wait for an available inbound slot. + self.connection_slots.wait_for_slot().await; + let result = listener.accept().await; + + let conn = match result { + Ok(c) => { + self.monitor + .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) + .await; + c + } + Err(err) => { + error!("Failed to accept a new connection: {err}"); + self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + return Err(err.into()); + } + }; + + self.connection_slots.add(); + + let selfc = self.clone(); + let endpoint = conn.peer_endpoint()?; + let on_disconnect = |res| async move { + if let TaskResult::Completed(Err(err)) = res { + trace!("Inbound connection dropped: {err}"); + } + selfc + .monitor + .notify(&ConnEvent::Disconnected(endpoint).into()) + .await; + selfc.connection_slots.remove().await; + }; + + let callback = callback.clone(); + self.task_group + .spawn(ex.clone(), callback(conn), on_disconnect); + } + } +} diff --git a/p2p/src/net/connection_queue.rs b/p2p/src/net/connection_queue.rs deleted file mode 100644 index 4c0de28..0000000 --- a/p2p/src/net/connection_queue.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::{collections::VecDeque, sync::Arc}; - -use smol::{channel::Sender, lock::Mutex}; - -use karyons_core::async_utils::CondVar; - -use karyons_net::Conn; - -use crate::net::ConnDirection; - -pub struct NewConn { - pub direction: ConnDirection, - pub conn: Conn, - pub disconnect_signal: Sender<()>, -} - -/// Connection queue -pub struct ConnQueue { - queue: Mutex>, - conn_available: CondVar, -} - -impl ConnQueue { - pub fn new() -> Arc { - Arc::new(Self { - queue: Mutex::new(VecDeque::new()), - conn_available: CondVar::new(), - }) - } - - /// Push a connection into the queue and wait for the disconnect signal - pub async fn handle(&self, conn: Conn, direction: ConnDirection) { - let (disconnect_signal, chan) = smol::channel::bounded(1); - let new_conn = NewConn { - direction, - conn, - disconnect_signal, - }; - self.queue.lock().await.push_back(new_conn); - self.conn_available.signal(); - let _ = chan.recv().await; - } - - /// Receive the next connection in the queue - pub async fn next(&self) -> NewConn { - let mut queue = self.queue.lock().await; - while queue.is_empty() { - queue = self.conn_available.wait(queue).await; - } - queue.pop_front().unwrap() - } -} diff --git a/p2p/src/net/connector.rs b/p2p/src/net/connector.rs deleted file mode 100644 index 72dc0d8..0000000 --- a/p2p/src/net/connector.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{trace, warn}; - -use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, -}; -use karyons_net::{dial, Conn, Endpoint, NetError}; - -use crate::{ - monitor::{ConnEvent, Monitor}, - Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating outbound connections with other peers. -pub struct Connector { - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Manages available outbound slots. - connection_slots: Arc, - - /// The maximum number of retries allowed before successfully - /// establishing a connection. - max_retries: usize, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl Connector { - /// Creates a new Connector - pub fn new( - max_retries: usize, - connection_slots: Arc, - monitor: Arc, - ) -> Arc { - Arc::new(Self { - task_group: TaskGroup::new(), - monitor, - connection_slots, - max_retries, - }) - } - - /// Shuts down the connector - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - /// Establish a connection to the specified `endpoint`. If the connection - /// attempt fails, it performs a backoff and retries until the maximum allowed - /// number of retries is exceeded. On a successful connection, it returns a - /// `Conn` instance. - /// - /// This method will block until it finds an available slot. - pub async fn connect(&self, endpoint: &Endpoint) -> Result { - self.connection_slots.wait_for_slot().await; - self.connection_slots.add(); - - let mut retry = 0; - let backoff = Backoff::new(500, 2000); - while retry < self.max_retries { - let conn_result = dial(endpoint).await; - - if let Ok(conn) = conn_result { - self.monitor - .notify(&ConnEvent::Connected(endpoint.clone()).into()) - .await; - return Ok(conn); - } - - self.monitor - .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into()) - .await; - - backoff.sleep().await; - - warn!("try to reconnect {endpoint}"); - retry += 1; - } - - self.monitor - .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into()) - .await; - - self.connection_slots.remove().await; - Err(NetError::Timeout.into()) - } - - /// Establish a connection to the given `endpoint`. For each new connection, - /// it invokes the provided `callback`, and pass the connection to the callback. - pub async fn connect_with_cback<'a, Fut>( - self: &Arc, - ex: Executor<'a>, - endpoint: &Endpoint, - callback: impl FnOnce(Conn) -> Fut + Send + 'a, - ) -> Result<()> - where - Fut: Future> + Send + 'a, - { - let conn = self.connect(endpoint).await?; - - let selfc = self.clone(); - let endpoint = endpoint.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Outbound connection dropped: {err}"); - } - selfc - .monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) - .await; - selfc.connection_slots.remove().await; - }; - - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); - - Ok(()) - } -} diff --git a/p2p/src/net/listener.rs b/p2p/src/net/listener.rs deleted file mode 100644 index d1a7bfb..0000000 --- a/p2p/src/net/listener.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{error, info, trace}; - -use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - Executor, -}; - -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; - -use crate::{ - monitor::{ConnEvent, Monitor}, - Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating inbound connections with other peers. -pub struct Listener { - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Manages available inbound slots. - connection_slots: Arc, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl Listener { - /// Creates a new Listener - pub fn new(connection_slots: Arc, monitor: Arc) -> Arc { - Arc::new(Self { - connection_slots, - task_group: TaskGroup::new(), - monitor, - }) - } - - /// Starts a listener on the given `endpoint`. For each incoming connection - /// that is accepted, it invokes the provided `callback`, and pass the - /// connection to the callback. - /// - /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( - self: &Arc, - ex: Executor<'a>, - endpoint: Endpoint, - // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result - where - Fut: Future> + Send + 'a, - { - let listener = match listen(&endpoint).await { - Ok(listener) => { - self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) - .await; - listener - } - Err(err) => { - error!("Failed to listen on {endpoint}: {err}"); - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint).into()) - .await; - return Err(err.into()); - } - }; - - let resolved_endpoint = listener.local_endpoint()?; - - info!("Start listening on {endpoint}"); - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }, - ); - Ok(resolved_endpoint) - } - - /// Shuts down the listener - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - async fn listen_loop<'a, Fut>( - self: Arc, - ex: Executor<'a>, - listener: Box, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result<()> - where - Fut: Future> + Send + 'a, - { - loop { - // Wait for an available inbound slot. - self.connection_slots.wait_for_slot().await; - let result = listener.accept().await; - - let conn = match result { - Ok(c) => { - self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) - .await; - c - } - Err(err) => { - error!("Failed to accept a new connection: {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); - } - }; - - self.connection_slots.add(); - - let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); - } - selfc - .monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) - .await; - selfc.connection_slots.remove().await; - }; - - let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); - } - } -} diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs deleted file mode 100644 index 9cdc748..0000000 --- a/p2p/src/net/mod.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod connection_queue; -mod connector; -mod listener; -mod slots; - -pub use connection_queue::ConnQueue; -pub use connector::Connector; -pub use listener::Listener; -pub use slots::ConnectionSlots; - -use std::fmt; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { - Inbound, - Outbound, -} - -impl fmt::Display for ConnDirection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ConnDirection::Inbound => write!(f, "Inbound"), - ConnDirection::Outbound => write!(f, "Outbound"), - } - } -} diff --git a/p2p/src/net/slots.rs b/p2p/src/net/slots.rs deleted file mode 100644 index 99f0a78..0000000 --- a/p2p/src/net/slots.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -use karyons_core::async_utils::CondWait; - -/// Manages available inbound and outbound slots. -pub struct ConnectionSlots { - /// A condvar for notifying when a slot become available. - signal: CondWait, - /// The number of occupied slots - slots: AtomicUsize, - /// The maximum number of slots. - max_slots: usize, -} - -impl ConnectionSlots { - /// Creates a new ConnectionSlots - pub fn new(max_slots: usize) -> Self { - Self { - signal: CondWait::new(), - slots: AtomicUsize::new(0), - max_slots, - } - } - - /// Returns the number of occupied slots - pub fn load(&self) -> usize { - self.slots.load(Ordering::SeqCst) - } - - /// Increases the occupied slots by one. - pub fn add(&self) { - self.slots.fetch_add(1, Ordering::SeqCst); - } - - /// Decreases the occupied slots by one and notifies the waiting signal - /// to start accepting/connecting new connections. - pub async fn remove(&self) { - self.slots.fetch_sub(1, Ordering::SeqCst); - if self.slots.load(Ordering::SeqCst) < self.max_slots { - self.signal.signal().await; - } - } - - /// Waits for a slot to become available. - pub async fn wait_for_slot(&self) { - if self.slots.load(Ordering::SeqCst) < self.max_slots { - return; - } - - // Wait for a signal - self.signal.wait().await; - self.signal.reset().await; - } -} diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index ee0fdc4..60e76a1 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -20,9 +20,9 @@ use karyons_core::{ use karyons_net::Endpoint; use crate::{ + connection::ConnDirection, io_codec::{CodecMsg, IOCodec}, message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, - net::ConnDirection, peer_pool::{ArcPeerPool, WeakPeerPool}, protocol::{Protocol, ProtocolEvent, ProtocolID}, Config, Error, Result, diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index eac4d3d..2433cfc 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -20,11 +20,10 @@ use karyons_net::Conn; use crate::{ config::Config, + connection::{ConnDirection, ConnQueue}, io_codec::{CodecMsg, IOCodec}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, monitor::{Monitor, PeerPoolEvent}, - net::ConnDirection, - net::ConnQueue, peer::{ArcPeer, Peer, PeerID}, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, diff --git a/p2p/src/slots.rs b/p2p/src/slots.rs new file mode 100644 index 0000000..99f0a78 --- /dev/null +++ b/p2p/src/slots.rs @@ -0,0 +1,54 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +use karyons_core::async_utils::CondWait; + +/// Manages available inbound and outbound slots. +pub struct ConnectionSlots { + /// A condvar for notifying when a slot become available. + signal: CondWait, + /// The number of occupied slots + slots: AtomicUsize, + /// The maximum number of slots. + max_slots: usize, +} + +impl ConnectionSlots { + /// Creates a new ConnectionSlots + pub fn new(max_slots: usize) -> Self { + Self { + signal: CondWait::new(), + slots: AtomicUsize::new(0), + max_slots, + } + } + + /// Returns the number of occupied slots + pub fn load(&self) -> usize { + self.slots.load(Ordering::SeqCst) + } + + /// Increases the occupied slots by one. + pub fn add(&self) { + self.slots.fetch_add(1, Ordering::SeqCst); + } + + /// Decreases the occupied slots by one and notifies the waiting signal + /// to start accepting/connecting new connections. + pub async fn remove(&self) { + self.slots.fetch_sub(1, Ordering::SeqCst); + if self.slots.load(Ordering::SeqCst) < self.max_slots { + self.signal.signal().await; + } + } + + /// Waits for a slot to become available. + pub async fn wait_for_slot(&self) { + if self.slots.load(Ordering::SeqCst) < self.max_slots { + return; + } + + // Wait for a signal + self.signal.wait().await; + self.signal.reset().await; + } +} -- cgit v1.2.3