From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- p2p/src/discovery/lookup.rs | 28 ++++++++--------------- p2p/src/discovery/mod.rs | 53 ++++++++++++-------------------------------- p2p/src/discovery/refresh.rs | 23 +++++++------------ 3 files changed, 31 insertions(+), 73 deletions(-) (limited to 'p2p/src/discovery') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 4a06083..3beea7e 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,10 +5,7 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::timeout, crypto::KeyPair, util::decode, @@ -38,7 +35,7 @@ pub struct LookupService { id: PeerID, /// Routing Table - table: Arc>, + table: Arc, /// Listener listener: Arc, @@ -63,7 +60,7 @@ impl LookupService { pub fn new( key_pair: &KeyPair, id: &PeerID, - table: Arc>, + table: Arc, config: Arc, monitor: Arc, ex: Executor, @@ -158,12 +155,10 @@ impl LookupService { } } - let mut table = self.table.lock().await; for peer in peer_buffer.iter() { - let result = table.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry {:?}", result); } - drop(table); self.monitor .notify(DiscoveryEvent::LookupSucceeded( @@ -190,11 +185,10 @@ impl LookupService { .connect(endpoint.clone(), peer_id.clone(), &random_peer_id) .await?; - let table = self.table.lock().await; for peer in peers { if random_peers.contains(&peer) || peer.peer_id == self.id - || table.contains_key(&peer.peer_id.0) + || self.table.contains_key(&peer.peer_id.0) { continue; } @@ -233,10 +227,6 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result> { let conn = self.connector.connect(&endpoint, &peer_id).await?; - self.monitor - .notify(ConnEvent::Connected(endpoint.clone())) - .await; - let result = self.handle_outbound(conn, target_peer_id).await; self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; @@ -318,7 +308,7 @@ impl LookupService { } NetMsgCmd::Peer => { let (peer, _) = decode::(&msg.payload)?; - let result = self.table.lock().await.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry result: {:?}", result); } c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), @@ -381,9 +371,9 @@ impl LookupService { /// Sends a Peers msg. async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn) -> Result<()> { trace!("Send Peers msg"); - let table = self.table.lock().await; - let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); - drop(table); + let entries = self + .table + .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); let peers: Vec = entries.into_iter().map(|e| e.into()).collect(); conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 19ae77a..64e5c14 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -7,7 +7,7 @@ use log::{error, info}; use rand::{rngs::OsRng, seq::SliceRandom}; use karyon_core::{ - async_runtime::{lock::Mutex, Executor}, + async_runtime::Executor, async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; @@ -22,8 +22,8 @@ use crate::{ message::NetMsg, monitor::Monitor, routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, - INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, + UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, slots::ConnectionSlots, Error, PeerID, Result, @@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc; pub struct Discovery { /// Routing table - table: Arc>, + table: Arc, /// Lookup Service lookup_service: Arc, @@ -80,7 +80,7 @@ impl Discovery { let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); let table_key = peer_id.0; - let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); + let table = Arc::new(RoutingTable::new(table_key)); let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); @@ -201,8 +201,7 @@ impl Discovery { async fn connect_loop(self: Arc) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { - let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; - match random_table_entry { + match self.table.random_entry(PENDING_ENTRY) { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -233,18 +232,13 @@ impl Discovery { match result { Err(Error::IncompatiblePeer) => { error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; + selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); } Err(Error::PeerAlreadyConnected) => { - // TODO: Use an appropriate status. - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; - } - Err(_) => { - selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; - } - Ok(_) => { - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; + selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) } + Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } Ok(()) @@ -257,12 +251,8 @@ impl Discovery { if let Some(pid) = &pid { match result { - Ok(_) => { - self.update_table_entry(pid, CONNECTED_ENTRY).await; - } - Err(_) => { - self.update_table_entry(pid, UNREACHABLE_ENTRY).await; - } + Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY), + Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY), } } } @@ -274,16 +264,12 @@ impl Discovery { /// table doesn't have an available entry, it will connect to one of the /// provided bootstrap endpoints in the `Config` and initiate the lookup. async fn start_seeding(&self) { - match self - .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) - .await - { + match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); let peer_id = Some(entry.key.into()); if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { - self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) - .await; + self.table.update_entry(&entry.key, UNSTABLE_ENTRY); error!("Failed to do lookup: {endpoint}: {err}"); } } @@ -297,15 +283,4 @@ impl Discovery { } } } - - /// Returns a random entry from routing table. - async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option { - self.table.lock().await.random_entry(entry_flag).cloned() - } - - /// Update the entry status - async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { - let table = &mut self.table.lock().await; - table.update_entry(&pid.0, entry_flag); - } } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 745a5d5..eec6743 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -5,10 +5,7 @@ use log::{error, info, trace}; use rand::{rngs::OsRng, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; @@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]); pub struct RefreshService { /// Routing table - table: Arc>, + table: Arc, /// Resolved listen endpoint listen_endpoint: Option>, @@ -55,7 +52,7 @@ impl RefreshService { /// Creates a new refresh service pub fn new( config: Arc, - table: Arc>, + table: Arc, monitor: Arc, executor: Executor, ) -> Self { @@ -122,7 +119,7 @@ impl RefreshService { self.monitor.notify(DiscoveryEvent::RefreshStarted).await; let mut entries: Vec = vec![]; - for bucket in self.table.lock().await.iter() { + for bucket in self.table.buckets() { for entry in bucket .iter() .filter(|e| !e.is_connected() && !e.is_incompatible()) @@ -145,10 +142,7 @@ impl RefreshService { let mut tasks = Vec::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { - self.table - .lock() - .await - .remove_entry(&bucket_entry.entry.key); + self.table.remove_entry(&bucket_entry.entry.key); continue; } @@ -167,16 +161,15 @@ impl RefreshService { let key = &bucket_entry.entry.key; match self.connect(&bucket_entry.entry).await { Ok(_) => { - self.table.lock().await.update_entry(key, PENDING_ENTRY); + self.table.update_entry(key, PENDING_ENTRY); } Err(err) => { trace!("Failed to refresh entry {:?}: {err}", key); - let table = &mut self.table.lock().await; if bucket_entry.failures >= MAX_FAILURES { - table.remove_entry(key); + self.table.remove_entry(key); return; } - table.update_entry(key, UNREACHABLE_ENTRY); + self.table.update_entry(key, UNREACHABLE_ENTRY); } } } -- cgit v1.2.3