From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- p2p/src/listener.rs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'p2p/src/listener.rs') diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 4a41482..1abf79a 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -3,13 +3,16 @@ use std::{future::Future, sync::Arc}; use log::{debug, error, info}; use karyon_core::{ - async_util::{Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{TaskGroup, TaskResult}, crypto::KeyPair, }; -use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint}; +use karyon_net::{tcp, tls, Conn, Endpoint}; use crate::{ + codec::NetMsgCodec, + message::NetMsg, monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, tls_config::tls_server_config, @@ -22,7 +25,7 @@ pub struct Listener { key_pair: KeyPair, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Manages available inbound slots. connection_slots: Arc, @@ -41,7 +44,7 @@ impl Listener { connection_slots: Arc, enable_tls: bool, monitor: Arc, - ex: Executor<'static>, + ex: Executor, ) -> Arc { Arc::new(Self { key_pair: key_pair.clone(), @@ -61,7 +64,7 @@ impl Listener { self: &Arc, endpoint: Endpoint, // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result where Fut: Future> + Send + 'static, @@ -82,7 +85,7 @@ impl Listener { } }; - let resolved_endpoint = listener.local_endpoint()?; + let resolved_endpoint = listener.local_endpoint().map_err(Error::from)?; info!("Start listening on {resolved_endpoint}"); @@ -99,8 +102,8 @@ impl Listener { async fn listen_loop( self: Arc, - listener: Box, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, + listener: karyon_net::Listener, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) where Fut: Future> + Send + 'static, { @@ -112,7 +115,7 @@ impl Listener { let (conn, endpoint) = match result { Ok(c) => { let endpoint = match c.peer_endpoint() { - Ok(e) => e, + Ok(ep) => ep, Err(err) => { self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; error!("Failed to accept a new connection: {err}"); @@ -151,16 +154,19 @@ impl Listener { } } - async fn listen(&self, endpoint: &Endpoint) -> Result { + async fn listen(&self, endpoint: &Endpoint) -> Result> { if self.enable_tls { - let tls_config = tls_server_config(&self.key_pair)?; - tls::listen(endpoint, tls_config) + let tls_config = tls::ServerTlsConfig { + tcp_config: Default::default(), + server_config: tls_server_config(&self.key_pair)?, + }; + tls::listen(endpoint, tls_config, NetMsgCodec::new()) .await - .map(|l| Box::new(l) as karyon_net::Listener) + .map(|l| Box::new(l) as karyon_net::Listener) } else { - tcp::listen(endpoint) + tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await - .map(|l| Box::new(l) as karyon_net::Listener) + .map(|l| Box::new(l) as karyon_net::Listener) } .map_err(Error::KaryonNet) } -- cgit v1.2.3