From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- p2p/src/discovery/refresh.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 745a5d5..eec6743 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -5,10 +5,7 @@ use log::{error, info, trace}; use rand::{rngs::OsRng, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; @@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]); pub struct RefreshService { /// Routing table - table: Arc>, + table: Arc, /// Resolved listen endpoint listen_endpoint: Option>, @@ -55,7 +52,7 @@ impl RefreshService { /// Creates a new refresh service pub fn new( config: Arc, - table: Arc>, + table: Arc, monitor: Arc, executor: Executor, ) -> Self { @@ -122,7 +119,7 @@ impl RefreshService { self.monitor.notify(DiscoveryEvent::RefreshStarted).await; let mut entries: Vec = vec![]; - for bucket in self.table.lock().await.iter() { + for bucket in self.table.buckets() { for entry in bucket .iter() .filter(|e| !e.is_connected() && !e.is_incompatible()) @@ -145,10 +142,7 @@ impl RefreshService { let mut tasks = Vec::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { - self.table - .lock() - .await - .remove_entry(&bucket_entry.entry.key); + self.table.remove_entry(&bucket_entry.entry.key); continue; } @@ -167,16 +161,15 @@ impl RefreshService { let key = &bucket_entry.entry.key; match self.connect(&bucket_entry.entry).await { Ok(_) => { - self.table.lock().await.update_entry(key, PENDING_ENTRY); + self.table.update_entry(key, PENDING_ENTRY); } Err(err) => { trace!("Failed to refresh entry {:?}: {err}", key); - let table = &mut self.table.lock().await; if bucket_entry.failures >= MAX_FAILURES { - table.remove_entry(key); + self.table.remove_entry(key); return; } - table.update_entry(key, UNREACHABLE_ENTRY); + self.table.update_entry(key, UNREACHABLE_ENTRY); } } } -- cgit v1.2.3