diff options
| author | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 | 
|---|---|---|
| committer | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 | 
| commit | 849d827486c75b2ab223d7b0e638dbb5b74d4d1d (patch) | |
| tree | 41cd3babc37147ec4a40cab8ce8ae31c91cce33b /karyons_p2p/src/net | |
| parent | de1354525895ffbad18f90a5246fd65157f7449e (diff) | |
rename crates
Diffstat (limited to 'karyons_p2p/src/net')
| -rw-r--r-- | karyons_p2p/src/net/connection_queue.rs | 52 | ||||
| -rw-r--r-- | karyons_p2p/src/net/connector.rs | 125 | ||||
| -rw-r--r-- | karyons_p2p/src/net/listener.rs | 142 | ||||
| -rw-r--r-- | karyons_p2p/src/net/mod.rs | 27 | ||||
| -rw-r--r-- | karyons_p2p/src/net/slots.rs | 54 | 
5 files changed, 0 insertions, 400 deletions
| diff --git a/karyons_p2p/src/net/connection_queue.rs b/karyons_p2p/src/net/connection_queue.rs deleted file mode 100644 index fbc4bfc..0000000 --- a/karyons_p2p/src/net/connection_queue.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::sync::Arc; - -use smol::{channel::Sender, lock::Mutex}; - -use karyons_core::async_utils::CondVar; - -use karyons_net::Conn; - -use crate::net::ConnDirection; - -pub struct NewConn { -    pub direction: ConnDirection, -    pub conn: Conn, -    pub disconnect_signal: Sender<()>, -} - -/// Connection queue -pub struct ConnQueue { -    queue: Mutex<Vec<NewConn>>, -    conn_available: CondVar, -} - -impl ConnQueue { -    pub fn new() -> Arc<Self> { -        Arc::new(Self { -            queue: Mutex::new(Vec::new()), -            conn_available: CondVar::new(), -        }) -    } - -    /// Push a connection into the queue and wait for the disconnect signal -    pub async fn handle(&self, conn: Conn, direction: ConnDirection) { -        let (disconnect_signal, chan) = smol::channel::bounded(1); -        let new_conn = NewConn { -            direction, -            conn, -            disconnect_signal, -        }; -        self.queue.lock().await.push(new_conn); -        self.conn_available.signal(); -        let _ = chan.recv().await; -    } - -    /// Receive the next connection in the queue -    pub async fn next(&self) -> NewConn { -        let mut queue = self.queue.lock().await; -        while queue.is_empty() { -            queue = self.conn_available.wait(queue).await; -        } -        queue.pop().unwrap() -    } -} diff --git a/karyons_p2p/src/net/connector.rs b/karyons_p2p/src/net/connector.rs deleted file mode 100644 index 72dc0d8..0000000 --- a/karyons_p2p/src/net/connector.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{trace, warn}; - -use karyons_core::{ -    async_utils::{Backoff, TaskGroup, TaskResult}, -    Executor, -}; -use karyons_net::{dial, Conn, Endpoint, NetError}; - -use crate::{ -    monitor::{ConnEvent, Monitor}, -    Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating outbound connections with other peers. -pub struct Connector { -    /// Managing spawned tasks. -    task_group: TaskGroup, - -    /// Manages available outbound slots. -    connection_slots: Arc<ConnectionSlots>, - -    /// The maximum number of retries allowed before successfully -    /// establishing a connection. -    max_retries: usize, - -    /// Responsible for network and system monitoring. -    monitor: Arc<Monitor>, -} - -impl Connector { -    /// Creates a new Connector -    pub fn new( -        max_retries: usize, -        connection_slots: Arc<ConnectionSlots>, -        monitor: Arc<Monitor>, -    ) -> Arc<Self> { -        Arc::new(Self { -            task_group: TaskGroup::new(), -            monitor, -            connection_slots, -            max_retries, -        }) -    } - -    /// Shuts down the connector -    pub async fn shutdown(&self) { -        self.task_group.cancel().await; -    } - -    /// Establish a connection to the specified `endpoint`. If the connection -    /// attempt fails, it performs a backoff and retries until the maximum allowed -    /// number of retries is exceeded. On a successful connection, it returns a -    /// `Conn` instance. -    /// -    /// This method will block until it finds an available slot. -    pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> { -        self.connection_slots.wait_for_slot().await; -        self.connection_slots.add(); - -        let mut retry = 0; -        let backoff = Backoff::new(500, 2000); -        while retry < self.max_retries { -            let conn_result = dial(endpoint).await; - -            if let Ok(conn) = conn_result { -                self.monitor -                    .notify(&ConnEvent::Connected(endpoint.clone()).into()) -                    .await; -                return Ok(conn); -            } - -            self.monitor -                .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into()) -                .await; - -            backoff.sleep().await; - -            warn!("try to reconnect {endpoint}"); -            retry += 1; -        } - -        self.monitor -            .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into()) -            .await; - -        self.connection_slots.remove().await; -        Err(NetError::Timeout.into()) -    } - -    /// Establish a connection to the given `endpoint`. For each new connection, -    /// it invokes the provided `callback`, and pass the connection to the callback. -    pub async fn connect_with_cback<'a, Fut>( -        self: &Arc<Self>, -        ex: Executor<'a>, -        endpoint: &Endpoint, -        callback: impl FnOnce(Conn) -> Fut + Send + 'a, -    ) -> Result<()> -    where -        Fut: Future<Output = Result<()>> + Send + 'a, -    { -        let conn = self.connect(endpoint).await?; - -        let selfc = self.clone(); -        let endpoint = endpoint.clone(); -        let on_disconnect = |res| async move { -            if let TaskResult::Completed(Err(err)) = res { -                trace!("Outbound connection dropped: {err}"); -            } -            selfc -                .monitor -                .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) -                .await; -            selfc.connection_slots.remove().await; -        }; - -        self.task_group -            .spawn(ex.clone(), callback(conn), on_disconnect); - -        Ok(()) -    } -} diff --git a/karyons_p2p/src/net/listener.rs b/karyons_p2p/src/net/listener.rs deleted file mode 100644 index d1a7bfb..0000000 --- a/karyons_p2p/src/net/listener.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{error, info, trace}; - -use karyons_core::{ -    async_utils::{TaskGroup, TaskResult}, -    Executor, -}; - -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; - -use crate::{ -    monitor::{ConnEvent, Monitor}, -    Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating inbound connections with other peers. -pub struct Listener { -    /// Managing spawned tasks. -    task_group: TaskGroup, - -    /// Manages available inbound slots. -    connection_slots: Arc<ConnectionSlots>, - -    /// Responsible for network and system monitoring. -    monitor: Arc<Monitor>, -} - -impl Listener { -    /// Creates a new Listener -    pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> { -        Arc::new(Self { -            connection_slots, -            task_group: TaskGroup::new(), -            monitor, -        }) -    } - -    /// Starts a listener on the given `endpoint`. For each incoming connection -    /// that is accepted, it invokes the provided `callback`, and pass the -    /// connection to the callback. -    /// -    /// Returns the resloved listening endpoint. -    pub async fn start<'a, Fut>( -        self: &Arc<Self>, -        ex: Executor<'a>, -        endpoint: Endpoint, -        // https://github.com/rust-lang/rfcs/pull/2132 -        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, -    ) -> Result<Endpoint> -    where -        Fut: Future<Output = Result<()>> + Send + 'a, -    { -        let listener = match listen(&endpoint).await { -            Ok(listener) => { -                self.monitor -                    .notify(&ConnEvent::Listening(endpoint.clone()).into()) -                    .await; -                listener -            } -            Err(err) => { -                error!("Failed to listen on {endpoint}: {err}"); -                self.monitor -                    .notify(&ConnEvent::ListenFailed(endpoint).into()) -                    .await; -                return Err(err.into()); -            } -        }; - -        let resolved_endpoint = listener.local_endpoint()?; - -        info!("Start listening on {endpoint}"); - -        let selfc = self.clone(); -        self.task_group.spawn( -            ex.clone(), -            selfc.listen_loop(ex.clone(), listener, callback), -            |res| async move { -                if let TaskResult::Completed(Err(err)) = res { -                    error!("Listen loop stopped: {endpoint} {err}"); -                } -            }, -        ); -        Ok(resolved_endpoint) -    } - -    /// Shuts down the listener -    pub async fn shutdown(&self) { -        self.task_group.cancel().await; -    } - -    async fn listen_loop<'a, Fut>( -        self: Arc<Self>, -        ex: Executor<'a>, -        listener: Box<dyn NetListener>, -        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, -    ) -> Result<()> -    where -        Fut: Future<Output = Result<()>> + Send + 'a, -    { -        loop { -            // Wait for an available inbound slot. -            self.connection_slots.wait_for_slot().await; -            let result = listener.accept().await; - -            let conn = match result { -                Ok(c) => { -                    self.monitor -                        .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) -                        .await; -                    c -                } -                Err(err) => { -                    error!("Failed to accept a new connection: {err}"); -                    self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; -                    return Err(err.into()); -                } -            }; - -            self.connection_slots.add(); - -            let selfc = self.clone(); -            let endpoint = conn.peer_endpoint()?; -            let on_disconnect = |res| async move { -                if let TaskResult::Completed(Err(err)) = res { -                    trace!("Inbound connection dropped: {err}"); -                } -                selfc -                    .monitor -                    .notify(&ConnEvent::Disconnected(endpoint).into()) -                    .await; -                selfc.connection_slots.remove().await; -            }; - -            let callback = callback.clone(); -            self.task_group -                .spawn(ex.clone(), callback(conn), on_disconnect); -        } -    } -} diff --git a/karyons_p2p/src/net/mod.rs b/karyons_p2p/src/net/mod.rs deleted file mode 100644 index 9cdc748..0000000 --- a/karyons_p2p/src/net/mod.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod connection_queue; -mod connector; -mod listener; -mod slots; - -pub use connection_queue::ConnQueue; -pub use connector::Connector; -pub use listener::Listener; -pub use slots::ConnectionSlots; - -use std::fmt; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { -    Inbound, -    Outbound, -} - -impl fmt::Display for ConnDirection { -    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { -        match self { -            ConnDirection::Inbound => write!(f, "Inbound"), -            ConnDirection::Outbound => write!(f, "Outbound"), -        } -    } -} diff --git a/karyons_p2p/src/net/slots.rs b/karyons_p2p/src/net/slots.rs deleted file mode 100644 index 99f0a78..0000000 --- a/karyons_p2p/src/net/slots.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -use karyons_core::async_utils::CondWait; - -/// Manages available inbound and outbound slots. -pub struct ConnectionSlots { -    /// A condvar for notifying when a slot become available. -    signal: CondWait, -    /// The number of occupied slots -    slots: AtomicUsize, -    /// The maximum number of slots. -    max_slots: usize, -} - -impl ConnectionSlots { -    /// Creates a new ConnectionSlots -    pub fn new(max_slots: usize) -> Self { -        Self { -            signal: CondWait::new(), -            slots: AtomicUsize::new(0), -            max_slots, -        } -    } - -    /// Returns the number of occupied slots -    pub fn load(&self) -> usize { -        self.slots.load(Ordering::SeqCst) -    } - -    /// Increases the occupied slots by one. -    pub fn add(&self) { -        self.slots.fetch_add(1, Ordering::SeqCst); -    } - -    /// Decreases the occupied slots by one and notifies the waiting signal -    /// to start accepting/connecting new connections. -    pub async fn remove(&self) { -        self.slots.fetch_sub(1, Ordering::SeqCst); -        if self.slots.load(Ordering::SeqCst) < self.max_slots { -            self.signal.signal().await; -        } -    } - -    /// Waits for a slot to become available. -    pub async fn wait_for_slot(&self) { -        if self.slots.load(Ordering::SeqCst) < self.max_slots { -            return; -        } - -        // Wait for a signal -        self.signal.wait().await; -        self.signal.reset().await; -    } -} | 
