From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- p2p/src/connector.rs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'p2p/src/connector.rs') diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index de9e746..aea21ab 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -3,12 +3,15 @@ use std::{future::Future, sync::Arc}; use log::{error, trace, warn}; use karyon_core::{ - async_util::{Backoff, Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; -use karyon_net::{tcp, tls, Conn, Endpoint, NetError}; +use karyon_net::{tcp, tls, Conn, Endpoint, Error as NetError}; use crate::{ + codec::NetMsgCodec, + message::NetMsg, monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, tls_config::tls_client_config, @@ -23,7 +26,7 @@ pub struct Connector { key_pair: KeyPair, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Manages available outbound slots. connection_slots: Arc, @@ -47,7 +50,7 @@ impl Connector { connection_slots: Arc, enable_tls: bool, monitor: Arc, - ex: Executor<'static>, + ex: Executor, ) -> Arc { Arc::new(Self { key_pair: key_pair.clone(), @@ -70,7 +73,11 @@ impl Connector { /// `Conn` instance. /// /// This method will block until it finds an available slot. - pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option) -> Result { + pub async fn connect( + &self, + endpoint: &Endpoint, + peer_id: &Option, + ) -> Result> { self.connection_slots.wait_for_slot().await; self.connection_slots.add(); @@ -113,7 +120,7 @@ impl Connector { self: &Arc, endpoint: &Endpoint, peer_id: &Option, - callback: impl FnOnce(Conn) -> Fut + Send + 'static, + callback: impl FnOnce(Conn) -> Fut + Send + 'static, ) -> Result<()> where Fut: Future> + Send + 'static, @@ -138,14 +145,20 @@ impl Connector { Ok(()) } - async fn dial(&self, endpoint: &Endpoint, peer_id: &Option) -> Result { + async fn dial(&self, endpoint: &Endpoint, peer_id: &Option) -> Result> { if self.enable_tls { - let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?; - tls::dial(endpoint, tls_config, DNS_NAME) + let tls_config = tls::ClientTlsConfig { + tcp_config: Default::default(), + client_config: tls_client_config(&self.key_pair, peer_id.clone())?, + dns_name: DNS_NAME.to_string(), + }; + tls::dial(endpoint, tls_config, NetMsgCodec::new()) .await - .map(|l| Box::new(l) as Conn) + .map(|l| Box::new(l) as karyon_net::Conn) } else { - tcp::dial(endpoint).await.map(|l| Box::new(l) as Conn) + tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) + .await + .map(|l| Box::new(l) as karyon_net::Conn) } .map_err(Error::KaryonNet) } -- cgit v1.2.3