aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/listener.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/listener.rs')
-rw-r--r--p2p/src/listener.rs36
1 files changed, 21 insertions, 15 deletions
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 4a41482..1abf79a 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -3,13 +3,16 @@ use std::{future::Future, sync::Arc};
use log::{debug, error, info};
use karyon_core::{
- async_util::{Executor, TaskGroup, TaskResult},
+ async_runtime::Executor,
+ async_util::{TaskGroup, TaskResult},
crypto::KeyPair,
};
-use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint};
+use karyon_net::{tcp, tls, Conn, Endpoint};
use crate::{
+ codec::NetMsgCodec,
+ message::NetMsg,
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
tls_config::tls_server_config,
@@ -22,7 +25,7 @@ pub struct Listener {
key_pair: KeyPair,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// Manages available inbound slots.
connection_slots: Arc<ConnectionSlots>,
@@ -41,7 +44,7 @@ impl Listener {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: Executor<'static>,
+ ex: Executor,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
@@ -61,7 +64,7 @@ impl Listener {
self: &Arc<Self>,
endpoint: Endpoint,
// https://github.com/rust-lang/rfcs/pull/2132
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
+ callback: impl FnOnce(Conn<NetMsg>) -> Fut + Clone + Send + 'static,
) -> Result<Endpoint>
where
Fut: Future<Output = Result<()>> + Send + 'static,
@@ -82,7 +85,7 @@ impl Listener {
}
};
- let resolved_endpoint = listener.local_endpoint()?;
+ let resolved_endpoint = listener.local_endpoint().map_err(Error::from)?;
info!("Start listening on {resolved_endpoint}");
@@ -99,8 +102,8 @@ impl Listener {
async fn listen_loop<Fut>(
self: Arc<Self>,
- listener: Box<dyn ConnListener>,
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
+ listener: karyon_net::Listener<NetMsg>,
+ callback: impl FnOnce(Conn<NetMsg>) -> Fut + Clone + Send + 'static,
) where
Fut: Future<Output = Result<()>> + Send + 'static,
{
@@ -112,7 +115,7 @@ impl Listener {
let (conn, endpoint) = match result {
Ok(c) => {
let endpoint = match c.peer_endpoint() {
- Ok(e) => e,
+ Ok(ep) => ep,
Err(err) => {
self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
error!("Failed to accept a new connection: {err}");
@@ -151,16 +154,19 @@ impl Listener {
}
}
- async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener> {
+ async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> {
if self.enable_tls {
- let tls_config = tls_server_config(&self.key_pair)?;
- tls::listen(endpoint, tls_config)
+ let tls_config = tls::ServerTlsConfig {
+ tcp_config: Default::default(),
+ server_config: tls_server_config(&self.key_pair)?,
+ };
+ tls::listen(endpoint, tls_config, NetMsgCodec::new())
.await
- .map(|l| Box::new(l) as karyon_net::Listener)
+ .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
} else {
- tcp::listen(endpoint)
+ tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
- .map(|l| Box::new(l) as karyon_net::Listener)
+ .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
}
.map_err(Error::KaryonNet)
}