aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-30 01:53:49 +0300
committerhozan23 <hozan23@proton.me>2023-11-30 01:53:49 +0300
commit8c70a0d379b21541b5b2d1d37ff7fc61ca311cd4 (patch)
treef0f85ed503cba8d1411a2fd6d518e14f38845934 /p2p/src/discovery
parent58b249773ba4d63a8cdde92ff1cb22719e9b3334 (diff)
p2p/discovery: Select the first 8 entries from each bucket during the
refresh process Instead of selecting random entries during the refresh process, choose the fist 8 entries from each bucket in the routing table. This ensures that only the oldest entries are refreshed.
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/refresh.rs19
1 files changed, 10 insertions, 9 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index f797c71..180ac27 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -118,8 +118,8 @@ impl RefreshService {
}
/// Initiates periodic refreshing of the routing table. This function will
- /// select 8 random entries from each bucket in the routing table and start
- /// sending Ping messages to the entries.
+ /// selects the first 8 entries (oldest entries) from each bucket in the
+ /// routing table and starts sending Ping messages to the collected entries.
async fn refresh_loop(self: Arc<Self>) -> Result<()> {
let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
@@ -132,7 +132,11 @@ impl RefreshService {
let mut entries: Vec<BucketEntry> = vec![];
for bucket in self.table.lock().await.iter() {
- for entry in bucket.random_iter(8) {
+ for entry in bucket
+ .iter()
+ .filter(|e| !e.is_connected() && !e.is_incompatible())
+ .take(8)
+ {
entries.push(entry.clone())
}
}
@@ -142,22 +146,19 @@ impl RefreshService {
}
/// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt to that entry.
+ /// initiate a connection attempt.
async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
let ex = &self.executor;
+ // Enforce a maximum of 16 concurrent connections.
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {
- if bucket_entry.is_connected() || bucket_entry.is_incompatible() {
- continue;
- }
-
if bucket_entry.failures >= MAX_FAILURES {
self.table
.lock()
.await
.remove_entry(&bucket_entry.entry.key);
- return;
+ continue;
}
tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))