diff options
author | hozan23 <hozan23@proton.me> | 2023-11-12 11:26:38 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-12 11:26:38 +0300 |
commit | 5d91ead06c62fd7c3cd846659b935012616ce5ae (patch) | |
tree | fc6413ba9e486f786a44c82740ae3230c0384afe /p2p/src/net/listener.rs | |
parent | 24364ab8b4acf48f19d72c1b6d06c20478635eaf (diff) |
p2p: remove net directory
Diffstat (limited to 'p2p/src/net/listener.rs')
-rw-r--r-- | p2p/src/net/listener.rs | 142 |
1 files changed, 0 insertions, 142 deletions
diff --git a/p2p/src/net/listener.rs b/p2p/src/net/listener.rs deleted file mode 100644 index d1a7bfb..0000000 --- a/p2p/src/net/listener.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{error, info, trace}; - -use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - Executor, -}; - -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; - -use crate::{ - monitor::{ConnEvent, Monitor}, - Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating inbound connections with other peers. -pub struct Listener { - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Manages available inbound slots. - connection_slots: Arc<ConnectionSlots>, - - /// Responsible for network and system monitoring. - monitor: Arc<Monitor>, -} - -impl Listener { - /// Creates a new Listener - pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> { - Arc::new(Self { - connection_slots, - task_group: TaskGroup::new(), - monitor, - }) - } - - /// Starts a listener on the given `endpoint`. For each incoming connection - /// that is accepted, it invokes the provided `callback`, and pass the - /// connection to the callback. - /// - /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( - self: &Arc<Self>, - ex: Executor<'a>, - endpoint: Endpoint, - // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result<Endpoint> - where - Fut: Future<Output = Result<()>> + Send + 'a, - { - let listener = match listen(&endpoint).await { - Ok(listener) => { - self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) - .await; - listener - } - Err(err) => { - error!("Failed to listen on {endpoint}: {err}"); - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint).into()) - .await; - return Err(err.into()); - } - }; - - let resolved_endpoint = listener.local_endpoint()?; - - info!("Start listening on {endpoint}"); - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }, - ); - Ok(resolved_endpoint) - } - - /// Shuts down the listener - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - async fn listen_loop<'a, Fut>( - self: Arc<Self>, - ex: Executor<'a>, - listener: Box<dyn NetListener>, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result<()> - where - Fut: Future<Output = Result<()>> + Send + 'a, - { - loop { - // Wait for an available inbound slot. - self.connection_slots.wait_for_slot().await; - let result = listener.accept().await; - - let conn = match result { - Ok(c) => { - self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) - .await; - c - } - Err(err) => { - error!("Failed to accept a new connection: {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); - } - }; - - self.connection_slots.add(); - - let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); - } - selfc - .monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) - .await; - selfc.connection_slots.remove().await; - }; - - let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); - } - } -} |