aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs92
1 files changed, 45 insertions, 47 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 035a581..0c49ac2 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -3,31 +3,28 @@ use std::{sync::Arc, time::Duration};
use bincode::{Decode, Encode};
use log::{error, info, trace};
use rand::{rngs::OsRng, RngCore};
-use smol::{
- lock::{Mutex, RwLock},
- stream::StreamExt,
- Timer,
-};
use karyon_core::{
- async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult},
- util::{decode, encode},
+ async_runtime::{
+ lock::{Mutex, RwLock},
+ Executor,
+ },
+ async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
-use karyon_net::{udp, Connection, Endpoint, NetError};
-
-/// Maximum failures for an entry before removing it from the routing table.
-pub const MAX_FAILURES: u32 = 3;
-
-/// Ping message size
-const PINGMSG_SIZE: usize = 32;
+use karyon_net::{udp, Connection, Endpoint, Error as NetError};
use crate::{
+ codec::RefreshMsgCodec,
+ message::RefreshMsg,
monitor::{ConnEvent, DiscoveryEvent, Monitor},
routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY},
Config, Error, Result,
};
+/// Maximum failures for an entry before removing it from the routing table.
+pub const MAX_FAILURES: u32 = 3;
+
#[derive(Decode, Encode, Debug, Clone)]
pub struct PingMsg(pub [u8; 32]);
@@ -42,10 +39,10 @@ pub struct RefreshService {
listen_endpoint: Option<RwLock<Endpoint>>,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// A global executor
- executor: Executor<'static>,
+ executor: Executor,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -60,7 +57,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
- executor: Executor<'static>,
+ executor: Executor,
) -> Self {
let listen_endpoint = config
.listen_endpoint
@@ -118,9 +115,8 @@ impl RefreshService {
/// selects the first 8 entries (oldest entries) from each bucket in the
/// routing table and starts sending Ping messages to the collected entries.
async fn refresh_loop(self: Arc<Self>) -> Result<()> {
- let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
- timer.next().await;
+ sleep(Duration::from_secs(self.config.refresh_interval)).await;
trace!("Start refreshing the routing table...");
self.monitor
@@ -162,7 +158,7 @@ impl RefreshService {
}
for task in tasks {
- task.await;
+ let _ = task.await;
}
}
}
@@ -193,10 +189,10 @@ impl RefreshService {
async fn connect(&self, entry: &Entry) -> Result<()> {
let mut retry = 0;
let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port);
- let conn = udp::dial(&endpoint).await?;
+ let conn = udp::dial(&endpoint, Default::default(), RefreshMsgCodec {}).await?;
let backoff = Backoff::new(100, 5000);
while retry < self.config.refresh_connect_retries {
- match self.send_ping_msg(&conn).await {
+ match self.send_ping_msg(&conn, &endpoint).await {
Ok(()) => return Ok(()),
Err(Error::KaryonNet(NetError::Timeout)) => {
retry += 1;
@@ -214,7 +210,7 @@ impl RefreshService {
/// Set up a UDP listener and start listening for Ping messages from other
/// peers.
async fn listen_loop(self: Arc<Self>, endpoint: Endpoint) -> Result<()> {
- let conn = match udp::listen(&endpoint).await {
+ let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {
Ok(c) => {
self.monitor
.notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -240,46 +236,48 @@ impl RefreshService {
}
/// Listen to receive a Ping message and respond with a Pong message.
- async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
- let mut buf = [0; PINGMSG_SIZE];
- let (_, endpoint) = conn.recv_from(&mut buf).await?;
-
+ async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {
+ let (msg, endpoint) = conn.recv().await?;
self.monitor
.notify(&ConnEvent::Accepted(endpoint.clone()).into())
.await;
- let (ping_msg, _) = decode::<PingMsg>(&buf)?;
-
- let pong_msg = PongMsg(ping_msg.0);
- let buffer = encode(&pong_msg)?;
-
- conn.send_to(&buffer, &endpoint).await?;
+ match msg {
+ RefreshMsg::Ping(m) => {
+ let pong_msg = RefreshMsg::Pong(m);
+ conn.send((pong_msg, endpoint.clone())).await?;
+ }
+ RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())),
+ }
self.monitor
- .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
+ .notify(&ConnEvent::Disconnected(endpoint).into())
.await;
Ok(())
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
+ async fn send_ping_msg(
+ &self,
+ conn: &udp::UdpConn<RefreshMsgCodec>,
+ endpoint: &Endpoint,
+ ) -> Result<()> {
let mut nonce: [u8; 32] = [0; 32];
RngCore::fill_bytes(&mut OsRng, &mut nonce);
+ conn.send((RefreshMsg::Ping(nonce), endpoint.clone()))
+ .await?;
- let ping_msg = PingMsg(nonce);
- let buffer = encode(&ping_msg)?;
- conn.write(&buffer).await?;
-
- let buf = &mut [0; PINGMSG_SIZE];
let t = Duration::from_secs(self.config.refresh_response_timeout);
- timeout(t, conn.read(buf)).await??;
-
- let (pong_msg, _) = decode::<PongMsg>(buf)?;
+ let (msg, _) = timeout(t, conn.recv()).await??;
- if ping_msg.0 != pong_msg.0 {
- return Err(Error::InvalidPongMsg);
+ match msg {
+ RefreshMsg::Pong(n) => {
+ if n != nonce {
+ return Err(Error::InvalidPongMsg);
+ }
+ Ok(())
+ }
+ _ => Err(Error::InvalidMsg("Unexpected ping msg".into())),
}
-
- Ok(())
}
}