diff options
Diffstat (limited to 'p2p/src/protocols')
-rw-r--r-- | p2p/src/protocols/ping.rs | 23 |
1 files changed, 9 insertions, 14 deletions
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index f04e059..654644a 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -1,23 +1,19 @@ use std::{sync::Arc, time::Duration}; +use async_channel::{Receiver, Sender}; use async_trait::async_trait; use bincode::{Decode, Encode}; use log::trace; use rand::{rngs::OsRng, RngCore}; -use smol::{ - channel, - channel::{Receiver, Sender}, - stream::StreamExt, - Timer, -}; use karyon_core::{ - async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{select, sleep, timeout, Either, TaskGroup, TaskResult}, event::EventListener, util::decode, }; -use karyon_net::NetError; +use karyon_net::Error as NetError; use crate::{ peer::ArcPeer, @@ -38,12 +34,12 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, - task_group: TaskGroup<'static>, + task_group: TaskGroup, } impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: Executor) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { @@ -87,12 +83,11 @@ impl PingProtocol { } async fn ping_loop(self: Arc<Self>, chan: Receiver<[u8; 32]>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.ping_interval)); let rng = &mut OsRng; let mut retry = 0; while retry < MAX_FAILUERS { - timer.next().await; + sleep(Duration::from_secs(self.ping_interval)).await; let mut ping_nonce: [u8; 32] = [0; 32]; rng.fill_bytes(&mut ping_nonce); @@ -130,8 +125,8 @@ impl Protocol for PingProtocol { async fn start(self: Arc<Self>) -> Result<()> { trace!("Start Ping protocol"); - let (pong_chan, pong_chan_recv) = channel::bounded(1); - let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1); + let (pong_chan, pong_chan_recv) = async_channel::bounded(1); + let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1); let selfc = self.clone(); self.task_group.spawn( |