From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/src/discovery/lookup.rs | 15 +++++++------ p2p/src/discovery/mod.rs | 51 +++++++++++++++++++++++--------------------- p2p/src/discovery/refresh.rs | 38 ++++++++++++++++----------------- 3 files changed, 55 insertions(+), 49 deletions(-) (limited to 'p2p/src/discovery') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 94da900..52aa339 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,7 +5,7 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use smol::lock::{Mutex, RwLock}; -use karyons_core::{async_utils::timeout, utils::decode, Executor}; +use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor}; use karyons_net::{Conn, Endpoint}; @@ -59,15 +59,18 @@ impl LookupService { table: Arc>, config: Arc, monitor: Arc, + ex: GlobalExecutor, ) -> Self { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); + let connector = Connector::new( config.lookup_connect_retries, outbound_slots.clone(), monitor.clone(), + ex, ); let listen_endpoint = config @@ -88,8 +91,8 @@ impl LookupService { } /// Start the lookup service. - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { - self.start_listener(ex).await?; + pub async fn start(self: &Arc) -> Result<()> { + self.start_listener().await?; Ok(()) } @@ -233,7 +236,7 @@ impl LookupService { } /// Start a listener. - async fn start_listener(self: &Arc, ex: Executor<'_>) -> Result<()> { + async fn start_listener(self: &Arc) -> Result<()> { let addr = match &self.listen_endpoint { Some(a) => a.read().await.addr()?.clone(), None => return Ok(()), @@ -248,7 +251,7 @@ impl LookupService { Ok(()) }; - self.listener.start(ex, endpoint.clone(), callback).await?; + self.listener.start(endpoint.clone(), callback).await?; Ok(()) } diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7b8e7dc..7d37eec 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -9,7 +9,7 @@ use smol::lock::Mutex; use karyons_core::{ async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{Conn, Endpoint}; @@ -57,7 +57,7 @@ pub struct Discovery { pub(crate) outbound_slots: Arc, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Holds the configuration for the P2P network. config: Arc, @@ -70,6 +70,7 @@ impl Discovery { conn_queue: Arc, config: Arc, monitor: Arc, + ex: GlobalExecutor, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -77,16 +78,23 @@ impl Discovery { let table_key = peer_id.0; let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); - let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); - let lookup_service = - LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); + let refresh_service = + RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); + let lookup_service = LookupService::new( + peer_id, + table.clone(), + config.clone(), + monitor.clone(), + ex.clone(), + ); let connector = Connector::new( config.max_connect_retries, outbound_slots.clone(), monitor.clone(), + ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -97,13 +105,13 @@ impl Discovery { outbound_slots, connector, listener, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), config, }) } /// Start the Discovery - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc) -> Result<()> { // Check if the listen_endpoint is provided, and if so, start a listener. if let Some(endpoint) = &self.config.listen_endpoint { // Return an error if the discovery port is set to 0. @@ -113,7 +121,7 @@ impl Discovery { )); } - let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; + let resolved_endpoint = self.start_listener(endpoint).await?; if endpoint.addr()? != resolved_endpoint.addr()? { info!("Resolved listen endpoint: {resolved_endpoint}"); @@ -127,19 +135,19 @@ impl Discovery { } // Start the lookup service - self.lookup_service.start(ex.clone()).await?; + self.lookup_service.start().await?; // Start the refresh service - self.refresh_service.start(ex.clone()).await?; + self.refresh_service.start().await?; // Attempt to manually connect to peer endpoints provided in the Config. for endpoint in self.config.peer_endpoints.iter() { - let _ = self.connect(endpoint, None, ex.clone()).await; + let _ = self.connect(endpoint, None).await; } // Start connect loop let selfc = self.clone(); self.task_group - .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { + .spawn(selfc.connect_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } @@ -159,18 +167,14 @@ impl Discovery { } /// Start a listener and on success, return the resolved endpoint. - async fn start_listener( - self: &Arc, - endpoint: &Endpoint, - ex: Executor<'_>, - ) -> Result { + async fn start_listener(self: &Arc, endpoint: &Endpoint) -> Result { let selfc = self.clone(); let callback = |conn: Conn| async move { selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; Ok(()) }; - let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; + let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; Ok(resolved_endpoint) } @@ -180,7 +184,7 @@ impl Discovery { /// /// This will perform a backoff to prevent getting stuck in the loop /// if the seeding process couldn't find any peers. - async fn connect_loop(self: Arc, ex: Executor<'_>) -> Result<()> { + async fn connect_loop(self: Arc) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { let random_entry = self.random_entry(PENDING_ENTRY).await; @@ -188,8 +192,7 @@ impl Discovery { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); - self.connect(&endpoint, Some(entry.key.into()), ex.clone()) - .await; + self.connect(&endpoint, Some(entry.key.into())).await; } None => { backoff.sleep().await; @@ -200,7 +203,7 @@ impl Discovery { } /// Connect to the given endpoint using the connector - async fn connect(self: &Arc, endpoint: &Endpoint, pid: Option, ex: Executor<'_>) { + async fn connect(self: &Arc, endpoint: &Endpoint, pid: Option) { let selfc = self.clone(); let pid_cloned = pid.clone(); let cback = |conn: Conn| async move { @@ -211,7 +214,7 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(ex, endpoint, cback).await; + let res = self.connector.connect_with_cback(endpoint, cback).await; if let Some(pid) = &pid { match res { diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 7582c84..a708261 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -12,7 +12,7 @@ use smol::{ use karyons_core::{ async_utils::{timeout, Backoff, TaskGroup, TaskResult}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; @@ -43,7 +43,10 @@ pub struct RefreshService { listen_endpoint: Option>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global executor + executor: GlobalExecutor, /// Holds the configuration for the P2P network. config: Arc, @@ -58,6 +61,7 @@ impl RefreshService { config: Arc, table: Arc>, monitor: Arc, + executor: GlobalExecutor, ) -> Self { let listen_endpoint = config .listen_endpoint @@ -67,41 +71,36 @@ impl RefreshService { Self { table, listen_endpoint, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, config, monitor, } } /// Start the refresh service - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { let endpoint = endpoint.read().await; let addr = endpoint.addr()?; let port = self.config.discovery_port; let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(addr.clone(), port), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(addr.clone(), port), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }, - ); + }); } let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.refresh_loop(ex.clone()), - |res| async move { + self.task_group + .spawn(selfc.refresh_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }, - ); + }); Ok(()) } @@ -121,7 +120,7 @@ impl RefreshService { /// Initiates periodic refreshing of the routing table. This function will /// select 8 random entries from each bucket in the routing table and start /// sending Ping messages to the entries. - async fn refresh_loop(self: Arc, ex: Executor<'_>) -> Result<()> { + async fn refresh_loop(self: Arc) -> Result<()> { let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { timer.next().await; @@ -140,13 +139,14 @@ impl RefreshService { } drop(table); - self.clone().do_refresh(&entries, ex.clone()).await; + self.clone().do_refresh(&entries).await; } } /// Iterates over the entries and spawns a new task for each entry to /// initiate a connection attempt to that entry. - async fn do_refresh(self: Arc, entries: &[BucketEntry], ex: Executor<'_>) { + async fn do_refresh(self: Arc, entries: &[BucketEntry]) { + let ex = &self.executor; for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { -- cgit v1.2.3