From 98a1de91a2dae06323558422c239e5a45fc86e7b Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 Nov 2023 22:41:33 +0300 Subject: implement TLS for inbound and outbound connections --- p2p/src/listener.rs | 65 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 20 deletions(-) (limited to 'p2p/src/listener.rs') diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index f2391f7..58a0931 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -1,28 +1,36 @@ use std::{future::Future, sync::Arc}; -use log::{error, info, trace}; +use log::{debug, error, info}; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, + async_util::{TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; +use karyons_net::{listen, tls, Conn, Endpoint, Listener as NetListener}; use crate::{ monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, - Result, + tls_config::tls_server_config, + Error, Result, }; /// Responsible for creating inbound connections with other peers. pub struct Listener { + /// Identity Key pair + key_pair: KeyPair, + /// Managing spawned tasks. task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc, + /// Enables secure connection. + enable_tls: bool, + /// Responsible for network and system monitoring. monitor: Arc, } @@ -30,13 +38,17 @@ pub struct Listener { impl Listener { /// Creates a new Listener pub fn new( + key_pair: &KeyPair, connection_slots: Arc, + enable_tls: bool, monitor: Arc, ex: GlobalExecutor, ) -> Arc { Arc::new(Self { + key_pair: key_pair.clone(), connection_slots, task_group: TaskGroup::new(ex), + enable_tls, monitor, }) } @@ -55,7 +67,7 @@ impl Listener { where Fut: Future> + Send + 'static, { - let listener = match listen(&endpoint).await { + let listener = match self.listend(&endpoint).await { Ok(listener) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -67,21 +79,17 @@ impl Listener { self.monitor .notify(&ConnEvent::ListenFailed(endpoint).into()) .await; - return Err(err.into()); + return Err(err); } }; let resolved_endpoint = listener.local_endpoint()?; - info!("Start listening on {endpoint}"); + info!("Start listening on {resolved_endpoint}"); let selfc = self.clone(); self.task_group - .spawn(selfc.listen_loop(listener, callback), |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }); + .spawn(selfc.listen_loop(listener, callback), |_| async {}); Ok(resolved_endpoint) } @@ -94,8 +102,7 @@ impl Listener { self: Arc, listener: Box, callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, - ) -> Result<()> - where + ) where Fut: Future> + Send + 'static, { loop { @@ -103,27 +110,35 @@ impl Listener { self.connection_slots.wait_for_slot().await; let result = listener.accept().await; - let conn = match result { + let (conn, endpoint) = match result { Ok(c) => { + let endpoint = match c.peer_endpoint() { + Ok(e) => e, + Err(err) => { + self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + error!("Failed to accept a new connection: {err}"); + continue; + } + }; + self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) + .notify(&ConnEvent::Accepted(endpoint.clone()).into()) .await; - c + (c, endpoint) } Err(err) => { error!("Failed to accept a new connection: {err}"); self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); + continue; } }; self.connection_slots.add(); let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; let on_disconnect = |res| async move { if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); + debug!("Inbound connection dropped: {err}"); } selfc .monitor @@ -136,4 +151,14 @@ impl Listener { self.task_group.spawn(callback(conn), on_disconnect); } } + + async fn listend(&self, endpoint: &Endpoint) -> Result> { + if self.enable_tls { + let tls_config = tls_server_config(&self.key_pair)?; + tls::listen(endpoint, tls_config).await + } else { + listen(endpoint).await + } + .map_err(Error::KaryonsNet) + } } -- cgit v1.2.3