aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r--p2p/src/discovery/mod.rs30
1 files changed, 23 insertions, 7 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7f55309..2c1bcd8 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use smol::lock::Mutex;
use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
@@ -66,6 +67,7 @@ pub struct Discovery {
impl Discovery {
/// Creates a new Discovery
pub fn new(
+ key_pair: &KeyPair,
peer_id: &PeerID,
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
@@ -81,6 +83,7 @@ impl Discovery {
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
let lookup_service = LookupService::new(
+ key_pair,
peer_id,
table.clone(),
config.clone(),
@@ -89,12 +92,21 @@ impl Discovery {
);
let connector = Connector::new(
+ key_pair,
config.max_connect_retries,
outbound_slots.clone(),
+ config.enable_tls,
+ monitor.clone(),
+ ex.clone(),
+ );
+
+ let listener = Listener::new(
+ key_pair,
+ inbound_slots.clone(),
+ config.enable_tls,
monitor.clone(),
ex.clone(),
);
- let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
Arc::new(Self {
refresh_service: Arc::new(refresh_service),
@@ -222,7 +234,7 @@ impl Discovery {
selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
- // TODO
+ // TODO: Use the appropriate status.
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
@@ -236,10 +248,13 @@ impl Discovery {
Ok(())
};
- let res = self.connector.connect_with_cback(endpoint, cback).await;
+ let result = self
+ .connector
+ .connect_with_cback(endpoint, &pid, cback)
+ .await;
if let Some(pid) = &pid {
- match res {
+ match result {
Ok(_) => {
self.update_entry(pid, CONNECTED_ENTRY).await;
}
@@ -260,7 +275,8 @@ impl Discovery {
match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
- if let Err(err) = self.lookup_service.start_lookup(&endpoint).await {
+ let peer_id = Some(entry.key.into());
+ if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
error!("Failed to do lookup: {endpoint}: {err}");
}
@@ -268,7 +284,7 @@ impl Discovery {
None => {
let peers = &self.config.bootstrap_peers;
for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
- if let Err(err) = self.lookup_service.start_lookup(endpoint).await {
+ if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
error!("Failed to do lookup: {endpoint}: {err}");
}
}