From 8c70a0d379b21541b5b2d1d37ff7fc61ca311cd4 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 30 Nov 2023 01:53:49 +0300 Subject: p2p/discovery: Select the first 8 entries from each bucket during the refresh process Instead of selecting random entries during the refresh process, choose the fist 8 entries from each bucket in the routing table. This ensures that only the oldest entries are refreshed. --- p2p/src/discovery/refresh.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index f797c71..180ac27 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -118,8 +118,8 @@ impl RefreshService { } /// Initiates periodic refreshing of the routing table. This function will - /// select 8 random entries from each bucket in the routing table and start - /// sending Ping messages to the entries. + /// selects the first 8 entries (oldest entries) from each bucket in the + /// routing table and starts sending Ping messages to the collected entries. async fn refresh_loop(self: Arc) -> Result<()> { let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { @@ -132,7 +132,11 @@ impl RefreshService { let mut entries: Vec = vec![]; for bucket in self.table.lock().await.iter() { - for entry in bucket.random_iter(8) { + for entry in bucket + .iter() + .filter(|e| !e.is_connected() && !e.is_incompatible()) + .take(8) + { entries.push(entry.clone()) } } @@ -142,22 +146,19 @@ impl RefreshService { } /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt to that entry. + /// initiate a connection attempt. async fn do_refresh(self: Arc, entries: &[BucketEntry]) { let ex = &self.executor; + // Enforce a maximum of 16 concurrent connections. for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { - if bucket_entry.is_connected() || bucket_entry.is_incompatible() { - continue; - } - if bucket_entry.failures >= MAX_FAILURES { self.table .lock() .await .remove_entry(&bucket_entry.entry.key); - return; + continue; } tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) -- cgit v1.2.3