aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/net
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/net')
-rw-r--r--p2p/src/net/connection_queue.rs52
-rw-r--r--p2p/src/net/connector.rs125
-rw-r--r--p2p/src/net/listener.rs142
-rw-r--r--p2p/src/net/mod.rs27
-rw-r--r--p2p/src/net/slots.rs54
5 files changed, 400 insertions, 0 deletions
diff --git a/p2p/src/net/connection_queue.rs b/p2p/src/net/connection_queue.rs
new file mode 100644
index 0000000..fbc4bfc
--- /dev/null
+++ b/p2p/src/net/connection_queue.rs
@@ -0,0 +1,52 @@
+use std::sync::Arc;
+
+use smol::{channel::Sender, lock::Mutex};
+
+use karyons_core::async_utils::CondVar;
+
+use karyons_net::Conn;
+
+use crate::net::ConnDirection;
+
+pub struct NewConn {
+ pub direction: ConnDirection,
+ pub conn: Conn,
+ pub disconnect_signal: Sender<()>,
+}
+
+/// Connection queue
+pub struct ConnQueue {
+ queue: Mutex<Vec<NewConn>>,
+ conn_available: CondVar,
+}
+
+impl ConnQueue {
+ pub fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(Vec::new()),
+ conn_available: CondVar::new(),
+ })
+ }
+
+ /// Push a connection into the queue and wait for the disconnect signal
+ pub async fn handle(&self, conn: Conn, direction: ConnDirection) {
+ let (disconnect_signal, chan) = smol::channel::bounded(1);
+ let new_conn = NewConn {
+ direction,
+ conn,
+ disconnect_signal,
+ };
+ self.queue.lock().await.push(new_conn);
+ self.conn_available.signal();
+ let _ = chan.recv().await;
+ }
+
+ /// Receive the next connection in the queue
+ pub async fn next(&self) -> NewConn {
+ let mut queue = self.queue.lock().await;
+ while queue.is_empty() {
+ queue = self.conn_available.wait(queue).await;
+ }
+ queue.pop().unwrap()
+ }
+}
diff --git a/p2p/src/net/connector.rs b/p2p/src/net/connector.rs
new file mode 100644
index 0000000..72dc0d8
--- /dev/null
+++ b/p2p/src/net/connector.rs
@@ -0,0 +1,125 @@
+use std::{future::Future, sync::Arc};
+
+use log::{trace, warn};
+
+use karyons_core::{
+ async_utils::{Backoff, TaskGroup, TaskResult},
+ Executor,
+};
+use karyons_net::{dial, Conn, Endpoint, NetError};
+
+use crate::{
+ monitor::{ConnEvent, Monitor},
+ Result,
+};
+
+use super::slots::ConnectionSlots;
+
+/// Responsible for creating outbound connections with other peers.
+pub struct Connector {
+ /// Managing spawned tasks.
+ task_group: TaskGroup,
+
+ /// Manages available outbound slots.
+ connection_slots: Arc<ConnectionSlots>,
+
+ /// The maximum number of retries allowed before successfully
+ /// establishing a connection.
+ max_retries: usize,
+
+ /// Responsible for network and system monitoring.
+ monitor: Arc<Monitor>,
+}
+
+impl Connector {
+ /// Creates a new Connector
+ pub fn new(
+ max_retries: usize,
+ connection_slots: Arc<ConnectionSlots>,
+ monitor: Arc<Monitor>,
+ ) -> Arc<Self> {
+ Arc::new(Self {
+ task_group: TaskGroup::new(),
+ monitor,
+ connection_slots,
+ max_retries,
+ })
+ }
+
+ /// Shuts down the connector
+ pub async fn shutdown(&self) {
+ self.task_group.cancel().await;
+ }
+
+ /// Establish a connection to the specified `endpoint`. If the connection
+ /// attempt fails, it performs a backoff and retries until the maximum allowed
+ /// number of retries is exceeded. On a successful connection, it returns a
+ /// `Conn` instance.
+ ///
+ /// This method will block until it finds an available slot.
+ pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> {
+ self.connection_slots.wait_for_slot().await;
+ self.connection_slots.add();
+
+ let mut retry = 0;
+ let backoff = Backoff::new(500, 2000);
+ while retry < self.max_retries {
+ let conn_result = dial(endpoint).await;
+
+ if let Ok(conn) = conn_result {
+ self.monitor
+ .notify(&ConnEvent::Connected(endpoint.clone()).into())
+ .await;
+ return Ok(conn);
+ }
+
+ self.monitor
+ .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into())
+ .await;
+
+ backoff.sleep().await;
+
+ warn!("try to reconnect {endpoint}");
+ retry += 1;
+ }
+
+ self.monitor
+ .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into())
+ .await;
+
+ self.connection_slots.remove().await;
+ Err(NetError::Timeout.into())
+ }
+
+ /// Establish a connection to the given `endpoint`. For each new connection,
+ /// it invokes the provided `callback`, and pass the connection to the callback.
+ pub async fn connect_with_cback<'a, Fut>(
+ self: &Arc<Self>,
+ ex: Executor<'a>,
+ endpoint: &Endpoint,
+ callback: impl FnOnce(Conn) -> Fut + Send + 'a,
+ ) -> Result<()>
+ where
+ Fut: Future<Output = Result<()>> + Send + 'a,
+ {
+ let conn = self.connect(endpoint).await?;
+
+ let selfc = self.clone();
+ let endpoint = endpoint.clone();
+ let on_disconnect = |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ trace!("Outbound connection dropped: {err}");
+ }
+ selfc
+ .monitor
+ .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
+ .await;
+ selfc.connection_slots.remove().await;
+ };
+
+ self.task_group
+ .spawn(ex.clone(), callback(conn), on_disconnect);
+
+ Ok(())
+ }
+}
diff --git a/p2p/src/net/listener.rs b/p2p/src/net/listener.rs
new file mode 100644
index 0000000..d1a7bfb
--- /dev/null
+++ b/p2p/src/net/listener.rs
@@ -0,0 +1,142 @@
+use std::{future::Future, sync::Arc};
+
+use log::{error, info, trace};
+
+use karyons_core::{
+ async_utils::{TaskGroup, TaskResult},
+ Executor,
+};
+
+use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
+
+use crate::{
+ monitor::{ConnEvent, Monitor},
+ Result,
+};
+
+use super::slots::ConnectionSlots;
+
+/// Responsible for creating inbound connections with other peers.
+pub struct Listener {
+ /// Managing spawned tasks.
+ task_group: TaskGroup,
+
+ /// Manages available inbound slots.
+ connection_slots: Arc<ConnectionSlots>,
+
+ /// Responsible for network and system monitoring.
+ monitor: Arc<Monitor>,
+}
+
+impl Listener {
+ /// Creates a new Listener
+ pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> {
+ Arc::new(Self {
+ connection_slots,
+ task_group: TaskGroup::new(),
+ monitor,
+ })
+ }
+
+ /// Starts a listener on the given `endpoint`. For each incoming connection
+ /// that is accepted, it invokes the provided `callback`, and pass the
+ /// connection to the callback.
+ ///
+ /// Returns the resloved listening endpoint.
+ pub async fn start<'a, Fut>(
+ self: &Arc<Self>,
+ ex: Executor<'a>,
+ endpoint: Endpoint,
+ // https://github.com/rust-lang/rfcs/pull/2132
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ ) -> Result<Endpoint>
+ where
+ Fut: Future<Output = Result<()>> + Send + 'a,
+ {
+ let listener = match listen(&endpoint).await {
+ Ok(listener) => {
+ self.monitor
+ .notify(&ConnEvent::Listening(endpoint.clone()).into())
+ .await;
+ listener
+ }
+ Err(err) => {
+ error!("Failed to listen on {endpoint}: {err}");
+ self.monitor
+ .notify(&ConnEvent::ListenFailed(endpoint).into())
+ .await;
+ return Err(err.into());
+ }
+ };
+
+ let resolved_endpoint = listener.local_endpoint()?;
+
+ info!("Start listening on {endpoint}");
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ ex.clone(),
+ selfc.listen_loop(ex.clone(), listener, callback),
+ |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ error!("Listen loop stopped: {endpoint} {err}");
+ }
+ },
+ );
+ Ok(resolved_endpoint)
+ }
+
+ /// Shuts down the listener
+ pub async fn shutdown(&self) {
+ self.task_group.cancel().await;
+ }
+
+ async fn listen_loop<'a, Fut>(
+ self: Arc<Self>,
+ ex: Executor<'a>,
+ listener: Box<dyn NetListener>,
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ ) -> Result<()>
+ where
+ Fut: Future<Output = Result<()>> + Send + 'a,
+ {
+ loop {
+ // Wait for an available inbound slot.
+ self.connection_slots.wait_for_slot().await;
+ let result = listener.accept().await;
+
+ let conn = match result {
+ Ok(c) => {
+ self.monitor
+ .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into())
+ .await;
+ c
+ }
+ Err(err) => {
+ error!("Failed to accept a new connection: {err}");
+ self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ return Err(err.into());
+ }
+ };
+
+ self.connection_slots.add();
+
+ let selfc = self.clone();
+ let endpoint = conn.peer_endpoint()?;
+ let on_disconnect = |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ trace!("Inbound connection dropped: {err}");
+ }
+ selfc
+ .monitor
+ .notify(&ConnEvent::Disconnected(endpoint).into())
+ .await;
+ selfc.connection_slots.remove().await;
+ };
+
+ let callback = callback.clone();
+ self.task_group
+ .spawn(ex.clone(), callback(conn), on_disconnect);
+ }
+ }
+}
diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs
new file mode 100644
index 0000000..9cdc748
--- /dev/null
+++ b/p2p/src/net/mod.rs
@@ -0,0 +1,27 @@
+mod connection_queue;
+mod connector;
+mod listener;
+mod slots;
+
+pub use connection_queue::ConnQueue;
+pub use connector::Connector;
+pub use listener::Listener;
+pub use slots::ConnectionSlots;
+
+use std::fmt;
+
+/// Defines the direction of a network connection.
+#[derive(Clone, Debug)]
+pub enum ConnDirection {
+ Inbound,
+ Outbound,
+}
+
+impl fmt::Display for ConnDirection {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ ConnDirection::Inbound => write!(f, "Inbound"),
+ ConnDirection::Outbound => write!(f, "Outbound"),
+ }
+ }
+}
diff --git a/p2p/src/net/slots.rs b/p2p/src/net/slots.rs
new file mode 100644
index 0000000..99f0a78
--- /dev/null
+++ b/p2p/src/net/slots.rs
@@ -0,0 +1,54 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use karyons_core::async_utils::CondWait;
+
+/// Manages available inbound and outbound slots.
+pub struct ConnectionSlots {
+ /// A condvar for notifying when a slot become available.
+ signal: CondWait,
+ /// The number of occupied slots
+ slots: AtomicUsize,
+ /// The maximum number of slots.
+ max_slots: usize,
+}
+
+impl ConnectionSlots {
+ /// Creates a new ConnectionSlots
+ pub fn new(max_slots: usize) -> Self {
+ Self {
+ signal: CondWait::new(),
+ slots: AtomicUsize::new(0),
+ max_slots,
+ }
+ }
+
+ /// Returns the number of occupied slots
+ pub fn load(&self) -> usize {
+ self.slots.load(Ordering::SeqCst)
+ }
+
+ /// Increases the occupied slots by one.
+ pub fn add(&self) {
+ self.slots.fetch_add(1, Ordering::SeqCst);
+ }
+
+ /// Decreases the occupied slots by one and notifies the waiting signal
+ /// to start accepting/connecting new connections.
+ pub async fn remove(&self) {
+ self.slots.fetch_sub(1, Ordering::SeqCst);
+ if self.slots.load(Ordering::SeqCst) < self.max_slots {
+ self.signal.signal().await;
+ }
+ }
+
+ /// Waits for a slot to become available.
+ pub async fn wait_for_slot(&self) {
+ if self.slots.load(Ordering::SeqCst) < self.max_slots {
+ return;
+ }
+
+ // Wait for a signal
+ self.signal.wait().await;
+ self.signal.reset().await;
+ }
+}