diff options
author | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
commit | 78884caca030104557ca277dd3a41cefb70f5be8 (patch) | |
tree | c33650dfe44a219e395dff1966d298b58b09acb3 /p2p/src/peer | |
parent | f0729022589ee8e48b5558ab30462f95d06fe6df (diff) |
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling
its spawn method
also, define a global executor `Executor<'static>` and use static
lifetime instead of a lifetime placeholder
This improvement simplify the code for spawning a new task. There is no
need to pass the executor around.
Diffstat (limited to 'p2p/src/peer')
-rw-r--r-- | p2p/src/peer/mod.rs | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 60e76a1..85cd558 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -14,7 +14,7 @@ use karyons_core::{ async_utils::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::Endpoint; @@ -56,7 +56,7 @@ pub struct Peer { stop_chan: (Sender<Result<()>>, Receiver<Result<()>>), /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, } impl Peer { @@ -67,6 +67,7 @@ impl Peer { io_codec: IOCodec, remote_endpoint: Endpoint, conn_direction: ConnDirection, + ex: GlobalExecutor, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), @@ -76,14 +77,14 @@ impl Peer { remote_endpoint, conn_direction, protocol_events: EventSys::new(), - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), stop_chan: channel::bounded(1), }) } /// Run the peer - pub async fn run(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { - self.start_protocols(ex.clone()).await; + pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> { + self.start_protocols(ex).await; self.read_loop().await } @@ -205,7 +206,7 @@ impl Peer { } /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc<Self>, ex: Executor<'_>) { + async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) { for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { trace!("peer {} start protocol {protocol_id}", self.id); let protocol = constructor(self.clone()); @@ -213,7 +214,6 @@ impl Peer { self.protocol_ids.write().await.push(protocol_id.clone()); let selfc = self.clone(); - let exc = ex.clone(); let proto_idc = protocol_id.clone(); let on_failure = |result: TaskResult<Result<()>>| async move { @@ -227,7 +227,7 @@ impl Peer { }; self.task_group - .spawn(ex.clone(), protocol.start(exc), on_failure); + .spawn(protocol.start(ex.clone()), on_failure); } } |