From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/src/peer/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'p2p/src/peer') diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 60e76a1..85cd558 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -14,7 +14,7 @@ use karyons_core::{ async_utils::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::Endpoint; @@ -56,7 +56,7 @@ pub struct Peer { stop_chan: (Sender>, Receiver>), /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, } impl Peer { @@ -67,6 +67,7 @@ impl Peer { io_codec: IOCodec, remote_endpoint: Endpoint, conn_direction: ConnDirection, + ex: GlobalExecutor, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), @@ -76,14 +77,14 @@ impl Peer { remote_endpoint, conn_direction, protocol_events: EventSys::new(), - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), stop_chan: channel::bounded(1), }) } /// Run the peer - pub async fn run(self: Arc, ex: Executor<'_>) -> Result<()> { - self.start_protocols(ex.clone()).await; + pub async fn run(self: Arc, ex: GlobalExecutor) -> Result<()> { + self.start_protocols(ex).await; self.read_loop().await } @@ -205,7 +206,7 @@ impl Peer { } /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc, ex: Executor<'_>) { + async fn start_protocols(self: &Arc, ex: GlobalExecutor) { for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { trace!("peer {} start protocol {protocol_id}", self.id); let protocol = constructor(self.clone()); @@ -213,7 +214,6 @@ impl Peer { self.protocol_ids.write().await.push(protocol_id.clone()); let selfc = self.clone(); - let exc = ex.clone(); let proto_idc = protocol_id.clone(); let on_failure = |result: TaskResult>| async move { @@ -227,7 +227,7 @@ impl Peer { }; self.task_group - .spawn(ex.clone(), protocol.start(exc), on_failure); + .spawn(protocol.start(ex.clone()), on_failure); } } -- cgit v1.2.3