aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/connector.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/connector.rs')
-rw-r--r--p2p/src/connector.rs35
1 files changed, 24 insertions, 11 deletions
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index de9e746..aea21ab 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -3,12 +3,15 @@ use std::{future::Future, sync::Arc};
use log::{error, trace, warn};
use karyon_core::{
- async_util::{Backoff, Executor, TaskGroup, TaskResult},
+ async_runtime::Executor,
+ async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
-use karyon_net::{tcp, tls, Conn, Endpoint, NetError};
+use karyon_net::{tcp, tls, Conn, Endpoint, Error as NetError};
use crate::{
+ codec::NetMsgCodec,
+ message::NetMsg,
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
tls_config::tls_client_config,
@@ -23,7 +26,7 @@ pub struct Connector {
key_pair: KeyPair,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// Manages available outbound slots.
connection_slots: Arc<ConnectionSlots>,
@@ -47,7 +50,7 @@ impl Connector {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: Executor<'static>,
+ ex: Executor,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
@@ -70,7 +73,11 @@ impl Connector {
/// `Conn` instance.
///
/// This method will block until it finds an available slot.
- pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
+ pub async fn connect(
+ &self,
+ endpoint: &Endpoint,
+ peer_id: &Option<PeerID>,
+ ) -> Result<Conn<NetMsg>> {
self.connection_slots.wait_for_slot().await;
self.connection_slots.add();
@@ -113,7 +120,7 @@ impl Connector {
self: &Arc<Self>,
endpoint: &Endpoint,
peer_id: &Option<PeerID>,
- callback: impl FnOnce(Conn) -> Fut + Send + 'static,
+ callback: impl FnOnce(Conn<NetMsg>) -> Fut + Send + 'static,
) -> Result<()>
where
Fut: Future<Output = Result<()>> + Send + 'static,
@@ -138,14 +145,20 @@ impl Connector {
Ok(())
}
- async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
+ async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> {
if self.enable_tls {
- let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?;
- tls::dial(endpoint, tls_config, DNS_NAME)
+ let tls_config = tls::ClientTlsConfig {
+ tcp_config: Default::default(),
+ client_config: tls_client_config(&self.key_pair, peer_id.clone())?,
+ dns_name: DNS_NAME.to_string(),
+ };
+ tls::dial(endpoint, tls_config, NetMsgCodec::new())
.await
- .map(|l| Box::new(l) as Conn)
+ .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
} else {
- tcp::dial(endpoint).await.map(|l| Box::new(l) as Conn)
+ tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
+ .await
+ .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
}
.map_err(Error::KaryonNet)
}