From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/src/listener.rs | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) (limited to 'p2p/src/listener.rs') diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index ee92536..f2391f7 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -4,7 +4,7 @@ use log::{error, info, trace}; use karyons_core::{ async_utils::{TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; @@ -18,7 +18,7 @@ use crate::{ /// Responsible for creating inbound connections with other peers. pub struct Listener { /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc, @@ -29,10 +29,14 @@ pub struct Listener { impl Listener { /// Creates a new Listener - pub fn new(connection_slots: Arc, monitor: Arc) -> Arc { + pub fn new( + connection_slots: Arc, + monitor: Arc, + ex: GlobalExecutor, + ) -> Arc { Arc::new(Self { connection_slots, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), monitor, }) } @@ -42,15 +46,14 @@ impl Listener { /// connection to the callback. /// /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( + pub async fn start( self: &Arc, - ex: Executor<'a>, endpoint: Endpoint, // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result where - Fut: Future> + Send + 'a, + Fut: Future> + Send + 'static, { let listener = match listen(&endpoint).await { Ok(listener) => { @@ -73,15 +76,12 @@ impl Listener { info!("Start listening on {endpoint}"); let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(listener, callback), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {endpoint} {err}"); } - }, - ); + }); Ok(resolved_endpoint) } @@ -90,14 +90,13 @@ impl Listener { self.task_group.cancel().await; } - async fn listen_loop<'a, Fut>( + async fn listen_loop( self: Arc, - ex: Executor<'a>, listener: Box, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result<()> where - Fut: Future> + Send + 'a, + Fut: Future> + Send + 'static, { loop { // Wait for an available inbound slot. @@ -134,8 +133,7 @@ impl Listener { }; let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); + self.task_group.spawn(callback(conn), on_disconnect); } } } -- cgit v1.2.3