aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/net
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/net')
-rw-r--r--p2p/src/net/connection_queue.rs52
-rw-r--r--p2p/src/net/connector.rs125
-rw-r--r--p2p/src/net/listener.rs142
-rw-r--r--p2p/src/net/mod.rs27
-rw-r--r--p2p/src/net/slots.rs54
5 files changed, 0 insertions, 400 deletions
diff --git a/p2p/src/net/connection_queue.rs b/p2p/src/net/connection_queue.rs
deleted file mode 100644
index 4c0de28..0000000
--- a/p2p/src/net/connection_queue.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-use std::{collections::VecDeque, sync::Arc};
-
-use smol::{channel::Sender, lock::Mutex};
-
-use karyons_core::async_utils::CondVar;
-
-use karyons_net::Conn;
-
-use crate::net::ConnDirection;
-
-pub struct NewConn {
- pub direction: ConnDirection,
- pub conn: Conn,
- pub disconnect_signal: Sender<()>,
-}
-
-/// Connection queue
-pub struct ConnQueue {
- queue: Mutex<VecDeque<NewConn>>,
- conn_available: CondVar,
-}
-
-impl ConnQueue {
- pub fn new() -> Arc<Self> {
- Arc::new(Self {
- queue: Mutex::new(VecDeque::new()),
- conn_available: CondVar::new(),
- })
- }
-
- /// Push a connection into the queue and wait for the disconnect signal
- pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
- let (disconnect_signal, chan) = smol::channel::bounded(1);
- let new_conn = NewConn {
- direction,
- conn,
- disconnect_signal,
- };
- self.queue.lock().await.push_back(new_conn);
- self.conn_available.signal();
- let _ = chan.recv().await;
- }
-
- /// Receive the next connection in the queue
- pub async fn next(&self) -> NewConn {
- let mut queue = self.queue.lock().await;
- while queue.is_empty() {
- queue = self.conn_available.wait(queue).await;
- }
- queue.pop_front().unwrap()
- }
-}
diff --git a/p2p/src/net/connector.rs b/p2p/src/net/connector.rs
deleted file mode 100644
index 72dc0d8..0000000
--- a/p2p/src/net/connector.rs
+++ /dev/null
@@ -1,125 +0,0 @@
-use std::{future::Future, sync::Arc};
-
-use log::{trace, warn};
-
-use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
- Executor,
-};
-use karyons_net::{dial, Conn, Endpoint, NetError};
-
-use crate::{
- monitor::{ConnEvent, Monitor},
- Result,
-};
-
-use super::slots::ConnectionSlots;
-
-/// Responsible for creating outbound connections with other peers.
-pub struct Connector {
- /// Managing spawned tasks.
- task_group: TaskGroup,
-
- /// Manages available outbound slots.
- connection_slots: Arc<ConnectionSlots>,
-
- /// The maximum number of retries allowed before successfully
- /// establishing a connection.
- max_retries: usize,
-
- /// Responsible for network and system monitoring.
- monitor: Arc<Monitor>,
-}
-
-impl Connector {
- /// Creates a new Connector
- pub fn new(
- max_retries: usize,
- connection_slots: Arc<ConnectionSlots>,
- monitor: Arc<Monitor>,
- ) -> Arc<Self> {
- Arc::new(Self {
- task_group: TaskGroup::new(),
- monitor,
- connection_slots,
- max_retries,
- })
- }
-
- /// Shuts down the connector
- pub async fn shutdown(&self) {
- self.task_group.cancel().await;
- }
-
- /// Establish a connection to the specified `endpoint`. If the connection
- /// attempt fails, it performs a backoff and retries until the maximum allowed
- /// number of retries is exceeded. On a successful connection, it returns a
- /// `Conn` instance.
- ///
- /// This method will block until it finds an available slot.
- pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> {
- self.connection_slots.wait_for_slot().await;
- self.connection_slots.add();
-
- let mut retry = 0;
- let backoff = Backoff::new(500, 2000);
- while retry < self.max_retries {
- let conn_result = dial(endpoint).await;
-
- if let Ok(conn) = conn_result {
- self.monitor
- .notify(&ConnEvent::Connected(endpoint.clone()).into())
- .await;
- return Ok(conn);
- }
-
- self.monitor
- .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into())
- .await;
-
- backoff.sleep().await;
-
- warn!("try to reconnect {endpoint}");
- retry += 1;
- }
-
- self.monitor
- .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into())
- .await;
-
- self.connection_slots.remove().await;
- Err(NetError::Timeout.into())
- }
-
- /// Establish a connection to the given `endpoint`. For each new connection,
- /// it invokes the provided `callback`, and pass the connection to the callback.
- pub async fn connect_with_cback<'a, Fut>(
- self: &Arc<Self>,
- ex: Executor<'a>,
- endpoint: &Endpoint,
- callback: impl FnOnce(Conn) -> Fut + Send + 'a,
- ) -> Result<()>
- where
- Fut: Future<Output = Result<()>> + Send + 'a,
- {
- let conn = self.connect(endpoint).await?;
-
- let selfc = self.clone();
- let endpoint = endpoint.clone();
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- trace!("Outbound connection dropped: {err}");
- }
- selfc
- .monitor
- .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
- .await;
- selfc.connection_slots.remove().await;
- };
-
- self.task_group
- .spawn(ex.clone(), callback(conn), on_disconnect);
-
- Ok(())
- }
-}
diff --git a/p2p/src/net/listener.rs b/p2p/src/net/listener.rs
deleted file mode 100644
index d1a7bfb..0000000
--- a/p2p/src/net/listener.rs
+++ /dev/null
@@ -1,142 +0,0 @@
-use std::{future::Future, sync::Arc};
-
-use log::{error, info, trace};
-
-use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
- Executor,
-};
-
-use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
-
-use crate::{
- monitor::{ConnEvent, Monitor},
- Result,
-};
-
-use super::slots::ConnectionSlots;
-
-/// Responsible for creating inbound connections with other peers.
-pub struct Listener {
- /// Managing spawned tasks.
- task_group: TaskGroup,
-
- /// Manages available inbound slots.
- connection_slots: Arc<ConnectionSlots>,
-
- /// Responsible for network and system monitoring.
- monitor: Arc<Monitor>,
-}
-
-impl Listener {
- /// Creates a new Listener
- pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> {
- Arc::new(Self {
- connection_slots,
- task_group: TaskGroup::new(),
- monitor,
- })
- }
-
- /// Starts a listener on the given `endpoint`. For each incoming connection
- /// that is accepted, it invokes the provided `callback`, and pass the
- /// connection to the callback.
- ///
- /// Returns the resloved listening endpoint.
- pub async fn start<'a, Fut>(
- self: &Arc<Self>,
- ex: Executor<'a>,
- endpoint: Endpoint,
- // https://github.com/rust-lang/rfcs/pull/2132
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
- ) -> Result<Endpoint>
- where
- Fut: Future<Output = Result<()>> + Send + 'a,
- {
- let listener = match listen(&endpoint).await {
- Ok(listener) => {
- self.monitor
- .notify(&ConnEvent::Listening(endpoint.clone()).into())
- .await;
- listener
- }
- Err(err) => {
- error!("Failed to listen on {endpoint}: {err}");
- self.monitor
- .notify(&ConnEvent::ListenFailed(endpoint).into())
- .await;
- return Err(err.into());
- }
- };
-
- let resolved_endpoint = listener.local_endpoint()?;
-
- info!("Start listening on {endpoint}");
-
- let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(ex.clone(), listener, callback),
- |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Listen loop stopped: {endpoint} {err}");
- }
- },
- );
- Ok(resolved_endpoint)
- }
-
- /// Shuts down the listener
- pub async fn shutdown(&self) {
- self.task_group.cancel().await;
- }
-
- async fn listen_loop<'a, Fut>(
- self: Arc<Self>,
- ex: Executor<'a>,
- listener: Box<dyn NetListener>,
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
- ) -> Result<()>
- where
- Fut: Future<Output = Result<()>> + Send + 'a,
- {
- loop {
- // Wait for an available inbound slot.
- self.connection_slots.wait_for_slot().await;
- let result = listener.accept().await;
-
- let conn = match result {
- Ok(c) => {
- self.monitor
- .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into())
- .await;
- c
- }
- Err(err) => {
- error!("Failed to accept a new connection: {err}");
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
- return Err(err.into());
- }
- };
-
- self.connection_slots.add();
-
- let selfc = self.clone();
- let endpoint = conn.peer_endpoint()?;
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- trace!("Inbound connection dropped: {err}");
- }
- selfc
- .monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
- .await;
- selfc.connection_slots.remove().await;
- };
-
- let callback = callback.clone();
- self.task_group
- .spawn(ex.clone(), callback(conn), on_disconnect);
- }
- }
-}
diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs
deleted file mode 100644
index 9cdc748..0000000
--- a/p2p/src/net/mod.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-mod connection_queue;
-mod connector;
-mod listener;
-mod slots;
-
-pub use connection_queue::ConnQueue;
-pub use connector::Connector;
-pub use listener::Listener;
-pub use slots::ConnectionSlots;
-
-use std::fmt;
-
-/// Defines the direction of a network connection.
-#[derive(Clone, Debug)]
-pub enum ConnDirection {
- Inbound,
- Outbound,
-}
-
-impl fmt::Display for ConnDirection {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- ConnDirection::Inbound => write!(f, "Inbound"),
- ConnDirection::Outbound => write!(f, "Outbound"),
- }
- }
-}
diff --git a/p2p/src/net/slots.rs b/p2p/src/net/slots.rs
deleted file mode 100644
index 99f0a78..0000000
--- a/p2p/src/net/slots.rs
+++ /dev/null
@@ -1,54 +0,0 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use karyons_core::async_utils::CondWait;
-
-/// Manages available inbound and outbound slots.
-pub struct ConnectionSlots {
- /// A condvar for notifying when a slot become available.
- signal: CondWait,
- /// The number of occupied slots
- slots: AtomicUsize,
- /// The maximum number of slots.
- max_slots: usize,
-}
-
-impl ConnectionSlots {
- /// Creates a new ConnectionSlots
- pub fn new(max_slots: usize) -> Self {
- Self {
- signal: CondWait::new(),
- slots: AtomicUsize::new(0),
- max_slots,
- }
- }
-
- /// Returns the number of occupied slots
- pub fn load(&self) -> usize {
- self.slots.load(Ordering::SeqCst)
- }
-
- /// Increases the occupied slots by one.
- pub fn add(&self) {
- self.slots.fetch_add(1, Ordering::SeqCst);
- }
-
- /// Decreases the occupied slots by one and notifies the waiting signal
- /// to start accepting/connecting new connections.
- pub async fn remove(&self) {
- self.slots.fetch_sub(1, Ordering::SeqCst);
- if self.slots.load(Ordering::SeqCst) < self.max_slots {
- self.signal.signal().await;
- }
- }
-
- /// Waits for a slot to become available.
- pub async fn wait_for_slot(&self) {
- if self.slots.load(Ordering::SeqCst) < self.max_slots {
- return;
- }
-
- // Wait for a signal
- self.signal.wait().await;
- self.signal.reset().await;
- }
-}