diff options
author | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
commit | 78884caca030104557ca277dd3a41cefb70f5be8 (patch) | |
tree | c33650dfe44a219e395dff1966d298b58b09acb3 /p2p/src/discovery/mod.rs | |
parent | f0729022589ee8e48b5558ab30462f95d06fe6df (diff) |
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling
its spawn method
also, define a global executor `Executor<'static>` and use static
lifetime instead of a lifetime placeholder
This improvement simplify the code for spawning a new task. There is no
need to pass the executor around.
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r-- | p2p/src/discovery/mod.rs | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7b8e7dc..7d37eec 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -9,7 +9,7 @@ use smol::lock::Mutex; use karyons_core::{ async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{Conn, Endpoint}; @@ -57,7 +57,7 @@ pub struct Discovery { pub(crate) outbound_slots: Arc<ConnectionSlots>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -70,6 +70,7 @@ impl Discovery { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, + ex: GlobalExecutor, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -77,16 +78,23 @@ impl Discovery { let table_key = peer_id.0; let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); - let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); - let lookup_service = - LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); + let refresh_service = + RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); + let lookup_service = LookupService::new( + peer_id, + table.clone(), + config.clone(), + monitor.clone(), + ex.clone(), + ); let connector = Connector::new( config.max_connect_retries, outbound_slots.clone(), monitor.clone(), + ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -97,13 +105,13 @@ impl Discovery { outbound_slots, connector, listener, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), config, }) } /// Start the Discovery - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc<Self>) -> Result<()> { // Check if the listen_endpoint is provided, and if so, start a listener. if let Some(endpoint) = &self.config.listen_endpoint { // Return an error if the discovery port is set to 0. @@ -113,7 +121,7 @@ impl Discovery { )); } - let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; + let resolved_endpoint = self.start_listener(endpoint).await?; if endpoint.addr()? != resolved_endpoint.addr()? { info!("Resolved listen endpoint: {resolved_endpoint}"); @@ -127,19 +135,19 @@ impl Discovery { } // Start the lookup service - self.lookup_service.start(ex.clone()).await?; + self.lookup_service.start().await?; // Start the refresh service - self.refresh_service.start(ex.clone()).await?; + self.refresh_service.start().await?; // Attempt to manually connect to peer endpoints provided in the Config. for endpoint in self.config.peer_endpoints.iter() { - let _ = self.connect(endpoint, None, ex.clone()).await; + let _ = self.connect(endpoint, None).await; } // Start connect loop let selfc = self.clone(); self.task_group - .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { + .spawn(selfc.connect_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } @@ -159,18 +167,14 @@ impl Discovery { } /// Start a listener and on success, return the resolved endpoint. - async fn start_listener( - self: &Arc<Self>, - endpoint: &Endpoint, - ex: Executor<'_>, - ) -> Result<Endpoint> { + async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); let callback = |conn: Conn| async move { selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; Ok(()) }; - let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; + let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; Ok(resolved_endpoint) } @@ -180,7 +184,7 @@ impl Discovery { /// /// This will perform a backoff to prevent getting stuck in the loop /// if the seeding process couldn't find any peers. - async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn connect_loop(self: Arc<Self>) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { let random_entry = self.random_entry(PENDING_ENTRY).await; @@ -188,8 +192,7 @@ impl Discovery { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); - self.connect(&endpoint, Some(entry.key.into()), ex.clone()) - .await; + self.connect(&endpoint, Some(entry.key.into())).await; } None => { backoff.sleep().await; @@ -200,7 +203,7 @@ impl Discovery { } /// Connect to the given endpoint using the connector - async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) { + async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { let selfc = self.clone(); let pid_cloned = pid.clone(); let cback = |conn: Conn| async move { @@ -211,7 +214,7 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(ex, endpoint, cback).await; + let res = self.connector.connect_with_cback(endpoint, cback).await; if let Some(pid) = &pid { match res { |