aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/connector.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/connector.rs')
-rw-r--r--p2p/src/connector.rs56
1 files changed, 42 insertions, 14 deletions
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index f41ab57..6fc5734 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -1,21 +1,28 @@
use std::{future::Future, sync::Arc};
-use log::{trace, warn};
+use log::{error, trace, warn};
use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
-use karyons_net::{dial, Conn, Endpoint, NetError};
+use karyons_net::{dial, tls, Conn, Endpoint, NetError};
use crate::{
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
- Result,
+ tls_config::tls_client_config,
+ Error, PeerID, Result,
};
+static DNS_NAME: &str = "karyons.org";
+
/// Responsible for creating outbound connections with other peers.
pub struct Connector {
+ /// Identity Key pair
+ key_pair: KeyPair,
+
/// Managing spawned tasks.
task_group: TaskGroup<'static>,
@@ -26,6 +33,9 @@ pub struct Connector {
/// establishing a connection.
max_retries: usize,
+ /// Enables secure connection.
+ enable_tls: bool,
+
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
}
@@ -33,16 +43,20 @@ pub struct Connector {
impl Connector {
/// Creates a new Connector
pub fn new(
+ key_pair: &KeyPair,
max_retries: usize,
connection_slots: Arc<ConnectionSlots>,
+ enable_tls: bool,
monitor: Arc<Monitor>,
ex: GlobalExecutor,
) -> Arc<Self> {
Arc::new(Self {
+ key_pair: key_pair.clone(),
+ max_retries,
task_group: TaskGroup::new(ex),
monitor,
connection_slots,
- max_retries,
+ enable_tls,
})
}
@@ -57,20 +71,23 @@ impl Connector {
/// `Conn` instance.
///
/// This method will block until it finds an available slot.
- pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> {
+ pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
self.connection_slots.wait_for_slot().await;
self.connection_slots.add();
let mut retry = 0;
let backoff = Backoff::new(500, 2000);
while retry < self.max_retries {
- let conn_result = dial(endpoint).await;
-
- if let Ok(conn) = conn_result {
- self.monitor
- .notify(&ConnEvent::Connected(endpoint.clone()).into())
- .await;
- return Ok(conn);
+ match self.dial(endpoint, peer_id).await {
+ Ok(conn) => {
+ self.monitor
+ .notify(&ConnEvent::Connected(endpoint.clone()).into())
+ .await;
+ return Ok(conn);
+ }
+ Err(err) => {
+ error!("Failed to establish a connection to {endpoint}: {err}");
+ }
}
self.monitor
@@ -96,12 +113,13 @@ impl Connector {
pub async fn connect_with_cback<Fut>(
self: &Arc<Self>,
endpoint: &Endpoint,
+ peer_id: &Option<PeerID>,
callback: impl FnOnce(Conn) -> Fut + Send + 'static,
) -> Result<()>
where
Fut: Future<Output = Result<()>> + Send + 'static,
{
- let conn = self.connect(endpoint).await?;
+ let conn = self.connect(endpoint, peer_id).await?;
let selfc = self.clone();
let endpoint = endpoint.clone();
@@ -120,4 +138,14 @@ impl Connector {
Ok(())
}
+
+ async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
+ if self.enable_tls {
+ let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?;
+ tls::dial(endpoint, tls_config, DNS_NAME).await
+ } else {
+ dial(endpoint).await
+ }
+ .map_err(Error::KaryonsNet)
+ }
}