aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r--p2p/src/discovery/mod.rs53
1 files changed, 14 insertions, 39 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 19ae77a..64e5c14 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -7,7 +7,7 @@ use log::{error, info};
use rand::{rngs::OsRng, seq::SliceRandom};
use karyon_core::{
- async_runtime::{lock::Mutex, Executor},
+ async_runtime::Executor,
async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
@@ -22,8 +22,8 @@ use crate::{
message::NetMsg,
monitor::Monitor,
routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
- INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY,
+ UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
@@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc<Discovery>;
pub struct Discovery {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Lookup Service
lookup_service: Arc<LookupService>,
@@ -80,7 +80,7 @@ impl Discovery {
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
let table_key = peer_id.0;
- let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
+ let table = Arc::new(RoutingTable::new(table_key));
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
@@ -201,8 +201,7 @@ impl Discovery {
async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
- let random_table_entry = self.random_table_entry(PENDING_ENTRY).await;
- match random_table_entry {
+ match self.table.random_entry(PENDING_ENTRY) {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
@@ -233,18 +232,13 @@ impl Discovery {
match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
}
Err(Error::PeerAlreadyConnected) => {
- // TODO: Use an appropriate status.
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
- }
- Err(_) => {
- selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await;
- }
- Ok(_) => {
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
+ selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
}
+ Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
Ok(())
@@ -257,12 +251,8 @@ impl Discovery {
if let Some(pid) = &pid {
match result {
- Ok(_) => {
- self.update_table_entry(pid, CONNECTED_ENTRY).await;
- }
- Err(_) => {
- self.update_table_entry(pid, UNREACHABLE_ENTRY).await;
- }
+ Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY),
+ Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),
}
}
}
@@ -274,16 +264,12 @@ impl Discovery {
/// table doesn't have an available entry, it will connect to one of the
/// provided bootstrap endpoints in the `Config` and initiate the lookup.
async fn start_seeding(&self) {
- match self
- .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY)
- .await
- {
+ match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
let peer_id = Some(entry.key.into());
if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
- self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY)
- .await;
+ self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
error!("Failed to do lookup: {endpoint}: {err}");
}
}
@@ -297,15 +283,4 @@ impl Discovery {
}
}
}
-
- /// Returns a random entry from routing table.
- async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
- self.table.lock().await.random_entry(entry_flag).cloned()
- }
-
- /// Update the entry status
- async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
- let table = &mut self.table.lock().await;
- table.update_entry(&pid.0, entry_flag);
- }
}