aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3 /p2p/src/peer/mod.rs
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
Diffstat (limited to 'p2p/src/peer/mod.rs')
-rw-r--r--p2p/src/peer/mod.rs16
1 files changed, 8 insertions, 8 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 60e76a1..85cd558 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -14,7 +14,7 @@ use karyons_core::{
async_utils::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
utils::{decode, encode},
- Executor,
+ GlobalExecutor,
};
use karyons_net::Endpoint;
@@ -56,7 +56,7 @@ pub struct Peer {
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
}
impl Peer {
@@ -67,6 +67,7 @@ impl Peer {
io_codec: IOCodec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
+ ex: GlobalExecutor,
) -> ArcPeer {
Arc::new(Peer {
id: id.clone(),
@@ -76,14 +77,14 @@ impl Peer {
remote_endpoint,
conn_direction,
protocol_events: EventSys::new(),
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
stop_chan: channel::bounded(1),
})
}
/// Run the peer
- pub async fn run(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
- self.start_protocols(ex.clone()).await;
+ pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> {
+ self.start_protocols(ex).await;
self.read_loop().await
}
@@ -205,7 +206,7 @@ impl Peer {
}
/// Start running the protocols for this peer connection.
- async fn start_protocols(self: &Arc<Self>, ex: Executor<'_>) {
+ async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) {
for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() {
trace!("peer {} start protocol {protocol_id}", self.id);
let protocol = constructor(self.clone());
@@ -213,7 +214,6 @@ impl Peer {
self.protocol_ids.write().await.push(protocol_id.clone());
let selfc = self.clone();
- let exc = ex.clone();
let proto_idc = protocol_id.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
@@ -227,7 +227,7 @@ impl Peer {
};
self.task_group
- .spawn(ex.clone(), protocol.start(exc), on_failure);
+ .spawn(protocol.start(ex.clone()), on_failure);
}
}