diff options
Diffstat (limited to 'p2p/src/listener.rs')
-rw-r--r-- | p2p/src/listener.rs | 38 |
1 files changed, 18 insertions, 20 deletions
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index ee92536..f2391f7 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -4,7 +4,7 @@ use log::{error, info, trace}; use karyons_core::{ async_utils::{TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; @@ -18,7 +18,7 @@ use crate::{ /// Responsible for creating inbound connections with other peers. pub struct Listener { /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc<ConnectionSlots>, @@ -29,10 +29,14 @@ pub struct Listener { impl Listener { /// Creates a new Listener - pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> { + pub fn new( + connection_slots: Arc<ConnectionSlots>, + monitor: Arc<Monitor>, + ex: GlobalExecutor, + ) -> Arc<Self> { Arc::new(Self { connection_slots, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), monitor, }) } @@ -42,15 +46,14 @@ impl Listener { /// connection to the callback. /// /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( + pub async fn start<Fut>( self: &Arc<Self>, - ex: Executor<'a>, endpoint: Endpoint, // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result<Endpoint> where - Fut: Future<Output = Result<()>> + Send + 'a, + Fut: Future<Output = Result<()>> + Send + 'static, { let listener = match listen(&endpoint).await { Ok(listener) => { @@ -73,15 +76,12 @@ impl Listener { info!("Start listening on {endpoint}"); let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(listener, callback), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {endpoint} {err}"); } - }, - ); + }); Ok(resolved_endpoint) } @@ -90,14 +90,13 @@ impl Listener { self.task_group.cancel().await; } - async fn listen_loop<'a, Fut>( + async fn listen_loop<Fut>( self: Arc<Self>, - ex: Executor<'a>, listener: Box<dyn NetListener>, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result<()> where - Fut: Future<Output = Result<()>> + Send + 'a, + Fut: Future<Output = Result<()>> + Send + 'static, { loop { // Wait for an available inbound slot. @@ -134,8 +133,7 @@ impl Listener { }; let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); + self.task_group.spawn(callback(conn), on_disconnect); } } } |