From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- p2p/src/discovery/refresh.rs | 92 ++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 47 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 035a581..0c49ac2 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -3,31 +3,28 @@ use std::{sync::Arc, time::Duration}; use bincode::{Decode, Encode}; use log::{error, info, trace}; use rand::{rngs::OsRng, RngCore}; -use smol::{ - lock::{Mutex, RwLock}, - stream::StreamExt, - Timer, -}; use karyon_core::{ - async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult}, - util::{decode, encode}, + async_runtime::{ + lock::{Mutex, RwLock}, + Executor, + }, + async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; -use karyon_net::{udp, Connection, Endpoint, NetError}; - -/// Maximum failures for an entry before removing it from the routing table. -pub const MAX_FAILURES: u32 = 3; - -/// Ping message size -const PINGMSG_SIZE: usize = 32; +use karyon_net::{udp, Connection, Endpoint, Error as NetError}; use crate::{ + codec::RefreshMsgCodec, + message::RefreshMsg, monitor::{ConnEvent, DiscoveryEvent, Monitor}, routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, Config, Error, Result, }; +/// Maximum failures for an entry before removing it from the routing table. +pub const MAX_FAILURES: u32 = 3; + #[derive(Decode, Encode, Debug, Clone)] pub struct PingMsg(pub [u8; 32]); @@ -42,10 +39,10 @@ pub struct RefreshService { listen_endpoint: Option>, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// A global executor - executor: Executor<'static>, + executor: Executor, /// Holds the configuration for the P2P network. config: Arc, @@ -60,7 +57,7 @@ impl RefreshService { config: Arc, table: Arc>, monitor: Arc, - executor: Executor<'static>, + executor: Executor, ) -> Self { let listen_endpoint = config .listen_endpoint @@ -118,9 +115,8 @@ impl RefreshService { /// selects the first 8 entries (oldest entries) from each bucket in the /// routing table and starts sending Ping messages to the collected entries. async fn refresh_loop(self: Arc) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { - timer.next().await; + sleep(Duration::from_secs(self.config.refresh_interval)).await; trace!("Start refreshing the routing table..."); self.monitor @@ -162,7 +158,7 @@ impl RefreshService { } for task in tasks { - task.await; + let _ = task.await; } } } @@ -193,10 +189,10 @@ impl RefreshService { async fn connect(&self, entry: &Entry) -> Result<()> { let mut retry = 0; let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port); - let conn = udp::dial(&endpoint).await?; + let conn = udp::dial(&endpoint, Default::default(), RefreshMsgCodec {}).await?; let backoff = Backoff::new(100, 5000); while retry < self.config.refresh_connect_retries { - match self.send_ping_msg(&conn).await { + match self.send_ping_msg(&conn, &endpoint).await { Ok(()) => return Ok(()), Err(Error::KaryonNet(NetError::Timeout)) => { retry += 1; @@ -214,7 +210,7 @@ impl RefreshService { /// Set up a UDP listener and start listening for Ping messages from other /// peers. async fn listen_loop(self: Arc, endpoint: Endpoint) -> Result<()> { - let conn = match udp::listen(&endpoint).await { + let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -240,46 +236,48 @@ impl RefreshService { } /// Listen to receive a Ping message and respond with a Pong message. - async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { - let mut buf = [0; PINGMSG_SIZE]; - let (_, endpoint) = conn.recv_from(&mut buf).await?; - + async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { + let (msg, endpoint) = conn.recv().await?; self.monitor .notify(&ConnEvent::Accepted(endpoint.clone()).into()) .await; - let (ping_msg, _) = decode::(&buf)?; - - let pong_msg = PongMsg(ping_msg.0); - let buffer = encode(&pong_msg)?; - - conn.send_to(&buffer, &endpoint).await?; + match msg { + RefreshMsg::Ping(m) => { + let pong_msg = RefreshMsg::Pong(m); + conn.send((pong_msg, endpoint.clone())).await?; + } + RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())), + } self.monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) + .notify(&ConnEvent::Disconnected(endpoint).into()) .await; Ok(()) } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { + async fn send_ping_msg( + &self, + conn: &udp::UdpConn, + endpoint: &Endpoint, + ) -> Result<()> { let mut nonce: [u8; 32] = [0; 32]; RngCore::fill_bytes(&mut OsRng, &mut nonce); + conn.send((RefreshMsg::Ping(nonce), endpoint.clone())) + .await?; - let ping_msg = PingMsg(nonce); - let buffer = encode(&ping_msg)?; - conn.write(&buffer).await?; - - let buf = &mut [0; PINGMSG_SIZE]; let t = Duration::from_secs(self.config.refresh_response_timeout); - timeout(t, conn.read(buf)).await??; - - let (pong_msg, _) = decode::(buf)?; + let (msg, _) = timeout(t, conn.recv()).await??; - if ping_msg.0 != pong_msg.0 { - return Err(Error::InvalidPongMsg); + match msg { + RefreshMsg::Pong(n) => { + if n != nonce { + return Err(Error::InvalidPongMsg); + } + Ok(()) + } + _ => Err(Error::InvalidMsg("Unexpected ping msg".into())), } - - Ok(()) } } -- cgit v1.2.3