From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- p2p/src/discovery/mod.rs | 53 +++++++++++++----------------------------------- 1 file changed, 14 insertions(+), 39 deletions(-) (limited to 'p2p/src/discovery/mod.rs') diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 19ae77a..64e5c14 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -7,7 +7,7 @@ use log::{error, info}; use rand::{rngs::OsRng, seq::SliceRandom}; use karyon_core::{ - async_runtime::{lock::Mutex, Executor}, + async_runtime::Executor, async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; @@ -22,8 +22,8 @@ use crate::{ message::NetMsg, monitor::Monitor, routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, - INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, + UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, slots::ConnectionSlots, Error, PeerID, Result, @@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc; pub struct Discovery { /// Routing table - table: Arc>, + table: Arc, /// Lookup Service lookup_service: Arc, @@ -80,7 +80,7 @@ impl Discovery { let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); let table_key = peer_id.0; - let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); + let table = Arc::new(RoutingTable::new(table_key)); let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); @@ -201,8 +201,7 @@ impl Discovery { async fn connect_loop(self: Arc) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { - let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; - match random_table_entry { + match self.table.random_entry(PENDING_ENTRY) { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -233,18 +232,13 @@ impl Discovery { match result { Err(Error::IncompatiblePeer) => { error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; + selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); } Err(Error::PeerAlreadyConnected) => { - // TODO: Use an appropriate status. - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; - } - Err(_) => { - selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; - } - Ok(_) => { - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; + selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) } + Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } Ok(()) @@ -257,12 +251,8 @@ impl Discovery { if let Some(pid) = &pid { match result { - Ok(_) => { - self.update_table_entry(pid, CONNECTED_ENTRY).await; - } - Err(_) => { - self.update_table_entry(pid, UNREACHABLE_ENTRY).await; - } + Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY), + Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY), } } } @@ -274,16 +264,12 @@ impl Discovery { /// table doesn't have an available entry, it will connect to one of the /// provided bootstrap endpoints in the `Config` and initiate the lookup. async fn start_seeding(&self) { - match self - .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) - .await - { + match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); let peer_id = Some(entry.key.into()); if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { - self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) - .await; + self.table.update_entry(&entry.key, UNSTABLE_ENTRY); error!("Failed to do lookup: {endpoint}: {err}"); } } @@ -297,15 +283,4 @@ impl Discovery { } } } - - /// Returns a random entry from routing table. - async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option { - self.table.lock().await.random_entry(entry_flag).cloned() - } - - /// Update the entry status - async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { - let table = &mut self.table.lock().await; - table.update_entry(&pid.0, entry_flag); - } } -- cgit v1.2.3