aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-29 21:16:46 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-29 21:16:46 +0200
commit5c0abab1b7bf0f2858c451d6f0efc7ca0e138fc6 (patch)
tree9d64a261ddd289560365b71f5d02d31df6c4a0ec /p2p/src/discovery/refresh.rs
parentbcc6721257889f85f57af1b40351540585ffd41d (diff)
use shadown variables to name clones and place them between {} when spawning new tasks
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs45
1 files changed, 23 insertions, 22 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index c1d222b..b4f5396 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -38,9 +38,6 @@ pub struct RefreshService {
/// Managing spawned tasks.
task_group: TaskGroup,
- /// A global executor
- executor: Executor,
-
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -65,7 +62,6 @@ impl RefreshService {
table,
listen_endpoint,
task_group: TaskGroup::with_executor(executor.clone()),
- executor,
config,
monitor,
}
@@ -74,24 +70,32 @@ impl RefreshService {
/// Start the refresh service
pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await;
+ let endpoint = endpoint.read().await.clone();
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(endpoint).await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- });
+ },
+ );
}
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.refresh_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.refresh_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -133,25 +137,22 @@ impl RefreshService {
}
}
- /// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt.
+ /// Iterates over the entries and initiates a connection.
async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
- let ex = &self.executor;
- // Enforce a maximum of 16 concurrent connections.
+ use futures_util::stream::{FuturesUnordered, StreamExt};
+ // Enforce a maximum of 16 connections.
for chunk in entries.chunks(16) {
- let mut tasks = Vec::new();
+ let mut tasks = FuturesUnordered::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
- tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))
+ tasks.push(self.clone().refresh_entry(bucket_entry.clone()))
}
- for task in tasks {
- let _ = task.await;
- }
+ while tasks.next().await.is_some() {}
}
}