diff options
author | hozan23 <hozan23@proton.me> | 2023-11-29 00:15:10 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-29 00:15:10 +0300 |
commit | 2b032229e46293af92db798a36793c6b8b97baee (patch) | |
tree | 7b1304c952ff34604e9114d9d15c4687775c714b /p2p/src/protocols | |
parent | 21e76cf87153c038909d95ff40d982b70003e2fa (diff) |
p2p/protocol: improve the Protocol API
Diffstat (limited to 'p2p/src/protocols')
-rw-r--r-- | p2p/src/protocols/ping.rs | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index 0a5488d..ec7afe2 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -15,7 +15,7 @@ use karyons_core::{ async_util::{select, timeout, Either, TaskGroup, TaskResult}, event::EventListener, util::decode, - Executor, + GlobalExecutor, }; use karyons_net::NetError; @@ -39,17 +39,19 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, + task_group: TaskGroup<'static>, } impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { peer, ping_interval, ping_timeout, + task_group: TaskGroup::new(executor), }) } @@ -126,16 +128,14 @@ impl PingProtocol { #[async_trait] impl Protocol for PingProtocol { - async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn start(self: Arc<Self>) -> Result<()> { trace!("Start Ping protocol"); - let task_group = TaskGroup::new(ex); - let (pong_chan, pong_chan_recv) = channel::bounded(1); let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1); let selfc = self.clone(); - task_group.spawn( + self.task_group.spawn( selfc.clone().ping_loop(pong_chan_recv.clone()), |res| async move { if let TaskResult::Completed(result) = res { @@ -148,7 +148,7 @@ impl Protocol for PingProtocol { let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; listener.cancel().await; - task_group.cancel().await; + self.task_group.cancel().await; match result { Either::Left(res) => { |