aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/protocols/ping.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/protocols/ping.rs')
-rw-r--r--p2p/src/protocols/ping.rs23
1 files changed, 9 insertions, 14 deletions
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index f04e059..654644a 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -1,23 +1,19 @@
use std::{sync::Arc, time::Duration};
+use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use bincode::{Decode, Encode};
use log::trace;
use rand::{rngs::OsRng, RngCore};
-use smol::{
- channel,
- channel::{Receiver, Sender},
- stream::StreamExt,
- Timer,
-};
use karyon_core::{
- async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult},
+ async_runtime::Executor,
+ async_util::{select, sleep, timeout, Either, TaskGroup, TaskResult},
event::EventListener,
util::decode,
};
-use karyon_net::NetError;
+use karyon_net::Error as NetError;
use crate::{
peer::ArcPeer,
@@ -38,12 +34,12 @@ pub struct PingProtocol {
peer: ArcPeer,
ping_interval: u64,
ping_timeout: u64,
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
}
impl PingProtocol {
#[allow(clippy::new_ret_no_self)]
- pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol {
+ pub fn new(peer: ArcPeer, executor: Executor) -> ArcProtocol {
let ping_interval = peer.config().ping_interval;
let ping_timeout = peer.config().ping_timeout;
Arc::new(Self {
@@ -87,12 +83,11 @@ impl PingProtocol {
}
async fn ping_loop(self: Arc<Self>, chan: Receiver<[u8; 32]>) -> Result<()> {
- let mut timer = Timer::interval(Duration::from_secs(self.ping_interval));
let rng = &mut OsRng;
let mut retry = 0;
while retry < MAX_FAILUERS {
- timer.next().await;
+ sleep(Duration::from_secs(self.ping_interval)).await;
let mut ping_nonce: [u8; 32] = [0; 32];
rng.fill_bytes(&mut ping_nonce);
@@ -130,8 +125,8 @@ impl Protocol for PingProtocol {
async fn start(self: Arc<Self>) -> Result<()> {
trace!("Start Ping protocol");
- let (pong_chan, pong_chan_recv) = channel::bounded(1);
- let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1);
+ let (pong_chan, pong_chan_recv) = async_channel::bounded(1);
+ let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1);
let selfc = self.clone();
self.task_group.spawn(