aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/listener.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/listener.rs')
-rw-r--r--p2p/src/listener.rs65
1 files changed, 45 insertions, 20 deletions
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index f2391f7..58a0931 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -1,28 +1,36 @@
use std::{future::Future, sync::Arc};
-use log::{error, info, trace};
+use log::{debug, error, info};
use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
+ async_util::{TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
-use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
+use karyons_net::{listen, tls, Conn, Endpoint, Listener as NetListener};
use crate::{
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
- Result,
+ tls_config::tls_server_config,
+ Error, Result,
};
/// Responsible for creating inbound connections with other peers.
pub struct Listener {
+ /// Identity Key pair
+ key_pair: KeyPair,
+
/// Managing spawned tasks.
task_group: TaskGroup<'static>,
/// Manages available inbound slots.
connection_slots: Arc<ConnectionSlots>,
+ /// Enables secure connection.
+ enable_tls: bool,
+
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
}
@@ -30,13 +38,17 @@ pub struct Listener {
impl Listener {
/// Creates a new Listener
pub fn new(
+ key_pair: &KeyPair,
connection_slots: Arc<ConnectionSlots>,
+ enable_tls: bool,
monitor: Arc<Monitor>,
ex: GlobalExecutor,
) -> Arc<Self> {
Arc::new(Self {
+ key_pair: key_pair.clone(),
connection_slots,
task_group: TaskGroup::new(ex),
+ enable_tls,
monitor,
})
}
@@ -55,7 +67,7 @@ impl Listener {
where
Fut: Future<Output = Result<()>> + Send + 'static,
{
- let listener = match listen(&endpoint).await {
+ let listener = match self.listend(&endpoint).await {
Ok(listener) => {
self.monitor
.notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -67,21 +79,17 @@ impl Listener {
self.monitor
.notify(&ConnEvent::ListenFailed(endpoint).into())
.await;
- return Err(err.into());
+ return Err(err);
}
};
let resolved_endpoint = listener.local_endpoint()?;
- info!("Start listening on {endpoint}");
+ info!("Start listening on {resolved_endpoint}");
let selfc = self.clone();
self.task_group
- .spawn(selfc.listen_loop(listener, callback), |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Listen loop stopped: {endpoint} {err}");
- }
- });
+ .spawn(selfc.listen_loop(listener, callback), |_| async {});
Ok(resolved_endpoint)
}
@@ -94,8 +102,7 @@ impl Listener {
self: Arc<Self>,
listener: Box<dyn NetListener>,
callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
- ) -> Result<()>
- where
+ ) where
Fut: Future<Output = Result<()>> + Send + 'static,
{
loop {
@@ -103,27 +110,35 @@ impl Listener {
self.connection_slots.wait_for_slot().await;
let result = listener.accept().await;
- let conn = match result {
+ let (conn, endpoint) = match result {
Ok(c) => {
+ let endpoint = match c.peer_endpoint() {
+ Ok(e) => e,
+ Err(err) => {
+ self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ error!("Failed to accept a new connection: {err}");
+ continue;
+ }
+ };
+
self.monitor
- .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into())
+ .notify(&ConnEvent::Accepted(endpoint.clone()).into())
.await;
- c
+ (c, endpoint)
}
Err(err) => {
error!("Failed to accept a new connection: {err}");
self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
- return Err(err.into());
+ continue;
}
};
self.connection_slots.add();
let selfc = self.clone();
- let endpoint = conn.peer_endpoint()?;
let on_disconnect = |res| async move {
if let TaskResult::Completed(Err(err)) = res {
- trace!("Inbound connection dropped: {err}");
+ debug!("Inbound connection dropped: {err}");
}
selfc
.monitor
@@ -136,4 +151,14 @@ impl Listener {
self.task_group.spawn(callback(conn), on_disconnect);
}
}
+
+ async fn listend(&self, endpoint: &Endpoint) -> Result<Box<dyn NetListener>> {
+ if self.enable_tls {
+ let tls_config = tls_server_config(&self.key_pair)?;
+ tls::listen(endpoint, tls_config).await
+ } else {
+ listen(endpoint).await
+ }
+ .map_err(Error::KaryonsNet)
+ }
}