From 5c0abab1b7bf0f2858c451d6f0efc7ca0e138fc6 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 29 Jun 2024 21:16:46 +0200 Subject: use shadown variables to name clones and place them between {} when spawning new tasks --- p2p/src/discovery/refresh.rs | 45 ++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index c1d222b..b4f5396 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -38,9 +38,6 @@ pub struct RefreshService { /// Managing spawned tasks. task_group: TaskGroup, - /// A global executor - executor: Executor, - /// Holds the configuration for the P2P network. config: Arc, @@ -65,7 +62,6 @@ impl RefreshService { table, listen_endpoint, task_group: TaskGroup::with_executor(executor.clone()), - executor, config, monitor, } @@ -74,24 +70,32 @@ impl RefreshService { /// Start the refresh service pub async fn start(self: &Arc) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await; + let endpoint = endpoint.read().await.clone(); - let selfc = self.clone(); - self.task_group - .spawn(selfc.listen_loop(endpoint.clone()), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop(endpoint).await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }); + }, + ); } - let selfc = self.clone(); - self.task_group - .spawn(selfc.refresh_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.refresh_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -133,25 +137,22 @@ impl RefreshService { } } - /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt. + /// Iterates over the entries and initiates a connection. async fn do_refresh(self: Arc, entries: &[BucketEntry]) { - let ex = &self.executor; - // Enforce a maximum of 16 concurrent connections. + use futures_util::stream::{FuturesUnordered, StreamExt}; + // Enforce a maximum of 16 connections. for chunk in entries.chunks(16) { - let mut tasks = Vec::new(); + let mut tasks = FuturesUnordered::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { self.table.remove_entry(&bucket_entry.entry.key); continue; } - tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) + tasks.push(self.clone().refresh_entry(bucket_entry.clone())) } - for task in tasks { - let _ = task.await; - } + while tasks.next().await.is_some() {} } } -- cgit v1.2.3