diff options
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r-- | p2p/src/discovery/mod.rs | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7f55309..2c1bcd8 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom}; use smol::lock::Mutex; use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; @@ -66,6 +67,7 @@ pub struct Discovery { impl Discovery { /// Creates a new Discovery pub fn new( + key_pair: &KeyPair, peer_id: &PeerID, conn_queue: Arc<ConnQueue>, config: Arc<Config>, @@ -81,6 +83,7 @@ impl Discovery { let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); let lookup_service = LookupService::new( + key_pair, peer_id, table.clone(), config.clone(), @@ -89,12 +92,21 @@ impl Discovery { ); let connector = Connector::new( + key_pair, config.max_connect_retries, outbound_slots.clone(), + config.enable_tls, + monitor.clone(), + ex.clone(), + ); + + let listener = Listener::new( + key_pair, + inbound_slots.clone(), + config.enable_tls, monitor.clone(), ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -222,7 +234,7 @@ impl Discovery { selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; } Err(Error::PeerAlreadyConnected) => { - // TODO + // TODO: Use the appropriate status. selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; } Err(_) => { @@ -236,10 +248,13 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(endpoint, cback).await; + let result = self + .connector + .connect_with_cback(endpoint, &pid, cback) + .await; if let Some(pid) = &pid { - match res { + match result { Ok(_) => { self.update_entry(pid, CONNECTED_ENTRY).await; } @@ -260,7 +275,8 @@ impl Discovery { match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); - if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { + let peer_id = Some(entry.key.into()); + if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; error!("Failed to do lookup: {endpoint}: {err}"); } @@ -268,7 +284,7 @@ impl Discovery { None => { let peers = &self.config.bootstrap_peers; for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { - if let Err(err) = self.lookup_service.start_lookup(endpoint).await { + if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await { error!("Failed to do lookup: {endpoint}: {err}"); } } |