From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/src/discovery/refresh.rs | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 7582c84..a708261 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -12,7 +12,7 @@ use smol::{ use karyons_core::{ async_utils::{timeout, Backoff, TaskGroup, TaskResult}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; @@ -43,7 +43,10 @@ pub struct RefreshService { listen_endpoint: Option>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global executor + executor: GlobalExecutor, /// Holds the configuration for the P2P network. config: Arc, @@ -58,6 +61,7 @@ impl RefreshService { config: Arc, table: Arc>, monitor: Arc, + executor: GlobalExecutor, ) -> Self { let listen_endpoint = config .listen_endpoint @@ -67,41 +71,36 @@ impl RefreshService { Self { table, listen_endpoint, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, config, monitor, } } /// Start the refresh service - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { let endpoint = endpoint.read().await; let addr = endpoint.addr()?; let port = self.config.discovery_port; let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(addr.clone(), port), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(addr.clone(), port), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }, - ); + }); } let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.refresh_loop(ex.clone()), - |res| async move { + self.task_group + .spawn(selfc.refresh_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }, - ); + }); Ok(()) } @@ -121,7 +120,7 @@ impl RefreshService { /// Initiates periodic refreshing of the routing table. This function will /// select 8 random entries from each bucket in the routing table and start /// sending Ping messages to the entries. - async fn refresh_loop(self: Arc, ex: Executor<'_>) -> Result<()> { + async fn refresh_loop(self: Arc) -> Result<()> { let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { timer.next().await; @@ -140,13 +139,14 @@ impl RefreshService { } drop(table); - self.clone().do_refresh(&entries, ex.clone()).await; + self.clone().do_refresh(&entries).await; } } /// Iterates over the entries and spawns a new task for each entry to /// initiate a connection attempt to that entry. - async fn do_refresh(self: Arc, entries: &[BucketEntry], ex: Executor<'_>) { + async fn do_refresh(self: Arc, entries: &[BucketEntry]) { + let ex = &self.executor; for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { -- cgit v1.2.3