diff options
Diffstat (limited to 'p2p/src/listener.rs')
-rw-r--r-- | p2p/src/listener.rs | 65 |
1 files changed, 45 insertions, 20 deletions
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index f2391f7..58a0931 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -1,28 +1,36 @@ use std::{future::Future, sync::Arc}; -use log::{error, info, trace}; +use log::{debug, error, info}; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, + async_util::{TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; +use karyons_net::{listen, tls, Conn, Endpoint, Listener as NetListener}; use crate::{ monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, - Result, + tls_config::tls_server_config, + Error, Result, }; /// Responsible for creating inbound connections with other peers. pub struct Listener { + /// Identity Key pair + key_pair: KeyPair, + /// Managing spawned tasks. task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc<ConnectionSlots>, + /// Enables secure connection. + enable_tls: bool, + /// Responsible for network and system monitoring. monitor: Arc<Monitor>, } @@ -30,13 +38,17 @@ pub struct Listener { impl Listener { /// Creates a new Listener pub fn new( + key_pair: &KeyPair, connection_slots: Arc<ConnectionSlots>, + enable_tls: bool, monitor: Arc<Monitor>, ex: GlobalExecutor, ) -> Arc<Self> { Arc::new(Self { + key_pair: key_pair.clone(), connection_slots, task_group: TaskGroup::new(ex), + enable_tls, monitor, }) } @@ -55,7 +67,7 @@ impl Listener { where Fut: Future<Output = Result<()>> + Send + 'static, { - let listener = match listen(&endpoint).await { + let listener = match self.listend(&endpoint).await { Ok(listener) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -67,21 +79,17 @@ impl Listener { self.monitor .notify(&ConnEvent::ListenFailed(endpoint).into()) .await; - return Err(err.into()); + return Err(err); } }; let resolved_endpoint = listener.local_endpoint()?; - info!("Start listening on {endpoint}"); + info!("Start listening on {resolved_endpoint}"); let selfc = self.clone(); self.task_group - .spawn(selfc.listen_loop(listener, callback), |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }); + .spawn(selfc.listen_loop(listener, callback), |_| async {}); Ok(resolved_endpoint) } @@ -94,8 +102,7 @@ impl Listener { self: Arc<Self>, listener: Box<dyn NetListener>, callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, - ) -> Result<()> - where + ) where Fut: Future<Output = Result<()>> + Send + 'static, { loop { @@ -103,27 +110,35 @@ impl Listener { self.connection_slots.wait_for_slot().await; let result = listener.accept().await; - let conn = match result { + let (conn, endpoint) = match result { Ok(c) => { + let endpoint = match c.peer_endpoint() { + Ok(e) => e, + Err(err) => { + self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + error!("Failed to accept a new connection: {err}"); + continue; + } + }; + self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) + .notify(&ConnEvent::Accepted(endpoint.clone()).into()) .await; - c + (c, endpoint) } Err(err) => { error!("Failed to accept a new connection: {err}"); self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); + continue; } }; self.connection_slots.add(); let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; let on_disconnect = |res| async move { if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); + debug!("Inbound connection dropped: {err}"); } selfc .monitor @@ -136,4 +151,14 @@ impl Listener { self.task_group.spawn(callback(conn), on_disconnect); } } + + async fn listend(&self, endpoint: &Endpoint) -> Result<Box<dyn NetListener>> { + if self.enable_tls { + let tls_config = tls_server_config(&self.key_pair)?; + tls::listen(endpoint, tls_config).await + } else { + listen(endpoint).await + } + .map_err(Error::KaryonsNet) + } } |