diff options
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r-- | p2p/src/discovery/lookup.rs | 366 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 262 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 289 |
3 files changed, 917 insertions, 0 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs new file mode 100644 index 0000000..f404133 --- /dev/null +++ b/p2p/src/discovery/lookup.rs @@ -0,0 +1,366 @@ +use std::{sync::Arc, time::Duration}; + +use futures_util::{stream::FuturesUnordered, StreamExt}; +use log::{error, trace}; +use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; +use smol::lock::{Mutex, RwLock}; + +use karyons_core::{async_utils::timeout, utils::decode, Executor}; + +use karyons_net::{Conn, Endpoint}; + +use crate::{ + io_codec::IOCodec, + message::{ + get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, + ShutdownMsg, + }, + monitor::{ConnEvent, DiscoveryEvent, Monitor}, + net::{ConnectionSlots, Connector, Listener}, + routing_table::RoutingTable, + utils::version_match, + Config, Error, PeerID, Result, +}; + +/// Maximum number of peers that can be returned in a PeersMsg. +pub const MAX_PEERS_IN_PEERSMSG: usize = 10; + +pub struct LookupService { + /// Peer's ID + id: PeerID, + + /// Routing Table + table: Arc<Mutex<RoutingTable>>, + + /// Listener + listener: Arc<Listener>, + /// Connector + connector: Arc<Connector>, + + /// Outbound slots. + outbound_slots: Arc<ConnectionSlots>, + + /// Resolved listen endpoint + listen_endpoint: Option<RwLock<Endpoint>>, + + /// Holds the configuration for the P2P network. + config: Arc<Config>, + + /// Responsible for network and system monitoring. + monitor: Arc<Monitor>, +} + +impl LookupService { + /// Creates a new lookup service + pub fn new( + id: &PeerID, + table: Arc<Mutex<RoutingTable>>, + config: Arc<Config>, + monitor: Arc<Monitor>, + ) -> Self { + let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); + let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); + + let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let connector = Connector::new( + config.lookup_connect_retries, + outbound_slots.clone(), + monitor.clone(), + ); + + let listen_endpoint = config + .listen_endpoint + .as_ref() + .map(|endpoint| RwLock::new(endpoint.clone())); + + Self { + id: id.clone(), + table, + listener, + connector, + outbound_slots, + listen_endpoint, + config, + monitor, + } + } + + /// Start the lookup service. + pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + self.start_listener(ex).await?; + Ok(()) + } + + /// Set the resolved listen endpoint. + pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { + if let Some(endpoint) = &self.listen_endpoint { + *endpoint.write().await = resolved_endpoint.clone(); + } + } + + /// Shuts down the lookup service. + pub async fn shutdown(&self) { + self.connector.shutdown().await; + self.listener.shutdown().await; + } + + /// Starts iterative lookup and populate the routing table. + /// + /// This method begins by generating a random peer ID and connecting to the + /// provided endpoint. It then sends a FindPeer message containing the + /// randomly generated peer ID. Upon receiving peers from the initial lookup, + /// it starts connecting to these received peers and sends them a FindPeer + /// message that contains our own peer ID. + pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { + trace!("Lookup started {endpoint}"); + self.monitor + .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) + .await; + + let mut random_peers = vec![]; + if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { + self.monitor + .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) + .await; + return Err(err); + }; + + let mut peer_buffer = vec![]; + self.self_lookup(&random_peers, &mut peer_buffer).await; + + while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG { + match random_peers.pop() { + Some(p) => peer_buffer.push(p), + None => break, + } + } + + for peer in peer_buffer.iter() { + let mut table = self.table.lock().await; + let result = table.add_entry(peer.clone().into()); + trace!("Add entry {:?}", result); + } + + self.monitor + .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) + .await; + + Ok(()) + } + + /// Starts a random lookup + /// + /// This will perfom lookup on a random generated PeerID + async fn random_lookup( + &self, + endpoint: &Endpoint, + random_peers: &mut Vec<PeerMsg>, + ) -> Result<()> { + for _ in 0..2 { + let peer_id = PeerID::random(); + let peers = self.connect(&peer_id, endpoint.clone()).await?; + for peer in peers { + if random_peers.contains(&peer) + || peer.peer_id == self.id + || self.table.lock().await.contains_key(&peer.peer_id.0) + { + continue; + } + + random_peers.push(peer); + } + } + + Ok(()) + } + + /// Starts a self lookup + async fn self_lookup(&self, random_peers: &Vec<PeerMsg>, peer_buffer: &mut Vec<PeerMsg>) { + let mut tasks = FuturesUnordered::new(); + for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { + let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); + tasks.push(self.connect(&self.id, endpoint)) + } + + while let Some(result) = tasks.next().await { + match result { + Ok(peers) => peer_buffer.extend(peers), + Err(err) => { + error!("Failed to do self lookup: {err}"); + } + } + } + } + + /// Connects to the given endpoint + async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> { + let conn = self.connector.connect(&endpoint).await?; + let io_codec = IOCodec::new(conn); + let result = self.handle_outbound(io_codec, peer_id).await; + + self.monitor + .notify(&ConnEvent::Disconnected(endpoint).into()) + .await; + self.outbound_slots.remove().await; + + result + } + + /// Handles outbound connection + async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> { + trace!("Send Ping msg"); + self.send_ping_msg(&io_codec).await?; + + trace!("Send FindPeer msg"); + let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; + + if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { + return Err(Error::Lookup("Received too many peers in PeersMsg")); + } + + trace!("Send Peer msg"); + if let Some(endpoint) = &self.listen_endpoint { + self.send_peer_msg(&io_codec, endpoint.read().await.clone()) + .await?; + } + + trace!("Send Shutdown msg"); + self.send_shutdown_msg(&io_codec).await?; + + Ok(peers.0) + } + + /// Start a listener. + async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + let addr = match &self.listen_endpoint { + Some(a) => a.read().await.addr()?.clone(), + None => return Ok(()), + }; + + let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); + + let selfc = self.clone(); + let callback = |conn: Conn| async move { + let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); + timeout(t, selfc.handle_inbound(conn)).await??; + Ok(()) + }; + + self.listener.start(ex, endpoint.clone(), callback).await?; + Ok(()) + } + + /// Handles inbound connection + async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> { + let io_codec = IOCodec::new(conn); + loop { + let msg: NetMsg = io_codec.read().await?; + trace!("Receive msg {:?}", msg.header.command); + + if let NetMsgCmd::Shutdown = msg.header.command { + return Ok(()); + } + + match &msg.header.command { + NetMsgCmd::Ping => { + let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?; + if !version_match(&self.config.version.req, &ping_msg.version) { + return Err(Error::IncompatibleVersion("system: {}".into())); + } + self.send_pong_msg(ping_msg.nonce, &io_codec).await?; + } + NetMsgCmd::FindPeer => { + let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?; + let peer_id = findpeer_msg.0; + self.send_peers_msg(&peer_id, &io_codec).await?; + } + NetMsgCmd::Peer => { + let (peer, _) = decode::<PeerMsg>(&msg.payload)?; + let result = self.table.lock().await.add_entry(peer.clone().into()); + trace!("Add entry result: {:?}", result); + } + c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), + } + } + } + + /// Sends a Ping msg and wait to receive the Pong message. + async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { + trace!("Send Pong msg"); + + let mut nonce: [u8; 32] = [0; 32]; + RngCore::fill_bytes(&mut OsRng, &mut nonce); + + let ping_msg = PingMsg { + version: self.config.version.v.clone(), + nonce, + }; + io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; + + let t = Duration::from_secs(self.config.lookup_response_timeout); + let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + + let payload = get_msg_payload!(Pong, recv_msg); + let (pong_msg, _) = decode::<PongMsg>(&payload)?; + + if ping_msg.nonce != pong_msg.0 { + return Err(Error::InvalidPongMsg); + } + + Ok(()) + } + + /// Sends a Pong msg + async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { + trace!("Send Pong msg"); + io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; + Ok(()) + } + + /// Sends a FindPeer msg and wait to receivet the Peers msg. + async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> { + trace!("Send FindPeer msg"); + io_codec + .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) + .await?; + + let t = Duration::from_secs(self.config.lookup_response_timeout); + let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + + let payload = get_msg_payload!(Peers, recv_msg); + let (peers, _) = decode(&payload)?; + + Ok(peers) + } + + /// Sends a Peers msg. + async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { + trace!("Send Peers msg"); + let table = self.table.lock().await; + let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); + let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect(); + drop(table); + io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; + Ok(()) + } + + /// Sends a Peer msg. + async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { + trace!("Send Peer msg"); + let peer_msg = PeerMsg { + addr: endpoint.addr()?.clone(), + port: *endpoint.port()?, + discovery_port: self.config.discovery_port, + peer_id: self.id.clone(), + }; + io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; + Ok(()) + } + + /// Sends a Shutdown msg. + async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { + trace!("Send Shutdown msg"); + io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; + Ok(()) + } +} diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs new file mode 100644 index 0000000..94b350b --- /dev/null +++ b/p2p/src/discovery/mod.rs @@ -0,0 +1,262 @@ +mod lookup; +mod refresh; + +use std::sync::Arc; + +use log::{error, info}; +use rand::{rngs::OsRng, seq::SliceRandom}; +use smol::lock::Mutex; + +use karyons_core::{ + async_utils::{Backoff, TaskGroup, TaskResult}, + Executor, +}; + +use karyons_net::{Conn, Endpoint}; + +use crate::{ + config::Config, + monitor::Monitor, + net::ConnQueue, + net::{ConnDirection, ConnectionSlots, Connector, Listener}, + routing_table::{ + Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, + UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + }, + Error, PeerID, Result, +}; + +use lookup::LookupService; +use refresh::RefreshService; + +pub type ArcDiscovery = Arc<Discovery>; + +pub struct Discovery { + /// Routing table + table: Arc<Mutex<RoutingTable>>, + + /// Lookup Service + lookup_service: Arc<LookupService>, + + /// Refresh Service + refresh_service: Arc<RefreshService>, + + /// Connector + connector: Arc<Connector>, + /// Listener + listener: Arc<Listener>, + + /// Connection queue + conn_queue: Arc<ConnQueue>, + + /// Inbound slots. + pub(crate) inbound_slots: Arc<ConnectionSlots>, + /// Outbound slots. + pub(crate) outbound_slots: Arc<ConnectionSlots>, + + /// Managing spawned tasks. + task_group: TaskGroup, + + /// Holds the configuration for the P2P network. + config: Arc<Config>, +} + +impl Discovery { + /// Creates a new Discovery + pub fn new( + peer_id: &PeerID, + conn_queue: Arc<ConnQueue>, + config: Arc<Config>, + monitor: Arc<Monitor>, + ) -> ArcDiscovery { + let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); + let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); + + let table_key = peer_id.0; + let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); + + let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); + let lookup_service = + LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); + + let connector = Connector::new( + config.max_connect_retries, + outbound_slots.clone(), + monitor.clone(), + ); + let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + + Arc::new(Self { + refresh_service: Arc::new(refresh_service), + lookup_service: Arc::new(lookup_service), + conn_queue, + table, + inbound_slots, + outbound_slots, + connector, + listener, + task_group: TaskGroup::new(), + config, + }) + } + + /// Start the Discovery + pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + // Check if the listen_endpoint is provided, and if so, start a listener. + if let Some(endpoint) = &self.config.listen_endpoint { + // Return an error if the discovery port is set to 0. + if self.config.discovery_port == 0 { + return Err(Error::Config( + "Please add a valid discovery port".to_string(), + )); + } + + let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; + + if endpoint.addr()? != resolved_endpoint.addr()? { + info!("Resolved listen endpoint: {resolved_endpoint}"); + self.lookup_service + .set_listen_endpoint(&resolved_endpoint) + .await; + self.refresh_service + .set_listen_endpoint(&resolved_endpoint) + .await; + } + } + + // Start the lookup service + self.lookup_service.start(ex.clone()).await?; + // Start the refresh service + self.refresh_service.start(ex.clone()).await?; + + // Attempt to manually connect to peer endpoints provided in the Config. + for endpoint in self.config.peer_endpoints.iter() { + let _ = self.connect(endpoint, None, ex.clone()).await; + } + + // Start connect loop + let selfc = self.clone(); + self.task_group + .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { + if let TaskResult::Completed(Err(err)) = res { + error!("Connect loop stopped: {err}"); + } + }); + + Ok(()) + } + + /// Shuts down the discovery + pub async fn shutdown(&self) { + self.task_group.cancel().await; + self.connector.shutdown().await; + self.listener.shutdown().await; + + self.refresh_service.shutdown().await; + self.lookup_service.shutdown().await; + } + + /// Start a listener and on success, return the resolved endpoint. + async fn start_listener( + self: &Arc<Self>, + endpoint: &Endpoint, + ex: Executor<'_>, + ) -> Result<Endpoint> { + let selfc = self.clone(); + let callback = |conn: Conn| async move { + selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; + Ok(()) + }; + + let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; + Ok(resolved_endpoint) + } + + /// This method will attempt to connect to a peer in the routing table. + /// If the routing table is empty, it will start the seeding process for + /// finding new peers. + /// + /// This will perform a backoff to prevent getting stuck in the loop + /// if the seeding process couldn't find any peers. + async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + let backoff = Backoff::new(500, self.config.seeding_interval * 1000); + loop { + let random_entry = self.random_entry(PENDING_ENTRY).await; + match random_entry { + Some(entry) => { + backoff.reset(); + let endpoint = Endpoint::Tcp(entry.addr, entry.port); + self.connect(&endpoint, Some(entry.key.into()), ex.clone()) + .await; + } + None => { + backoff.sleep().await; + self.start_seeding().await; + } + } + } + } + + /// Connect to the given endpoint using the connector + async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) { + let selfc = self.clone(); + let pid_cloned = pid.clone(); + let cback = |conn: Conn| async move { + selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; + if let Some(pid) = pid_cloned { + selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + } + Ok(()) + }; + + let res = self.connector.connect_with_cback(ex, endpoint, cback).await; + + if let Some(pid) = &pid { + match res { + Ok(_) => { + self.update_entry(pid, CONNECTED_ENTRY).await; + } + Err(_) => { + self.update_entry(pid, UNREACHABLE_ENTRY).await; + } + } + } + } + + /// Starts seeding process. + /// + /// This method randomly selects a peer from the routing table and + /// attempts to connect to that peer for the initial lookup. If the routing + /// table doesn't have an available entry, it will connect to one of the + /// provided bootstrap endpoints in the `Config` and initiate the lookup. + async fn start_seeding(&self) { + match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { + Some(entry) => { + let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); + if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { + self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; + error!("Failed to do lookup: {endpoint}: {err}"); + } + } + None => { + let peers = &self.config.bootstrap_peers; + for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { + if let Err(err) = self.lookup_service.start_lookup(endpoint).await { + error!("Failed to do lookup: {endpoint}: {err}"); + } + } + } + } + } + + /// Returns a random entry from routing table. + async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { + self.table.lock().await.random_entry(entry_flag).cloned() + } + + /// Update the entry status + async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { + let table = &mut self.table.lock().await; + table.update_entry(&pid.0, entry_flag); + } +} diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs new file mode 100644 index 0000000..7582c84 --- /dev/null +++ b/p2p/src/discovery/refresh.rs @@ -0,0 +1,289 @@ +use std::{sync::Arc, time::Duration}; + +use bincode::{Decode, Encode}; +use log::{error, info, trace}; +use rand::{rngs::OsRng, RngCore}; +use smol::{ + lock::{Mutex, RwLock}, + stream::StreamExt, + Timer, +}; + +use karyons_core::{ + async_utils::{timeout, Backoff, TaskGroup, TaskResult}, + utils::{decode, encode}, + Executor, +}; + +use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; + +/// Maximum failures for an entry before removing it from the routing table. +pub const MAX_FAILURES: u32 = 3; + +/// Ping message size +const PINGMSG_SIZE: usize = 32; + +use crate::{ + monitor::{ConnEvent, DiscoveryEvent, Monitor}, + routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, + Config, Error, Result, +}; + +#[derive(Decode, Encode, Debug, Clone)] +pub struct PingMsg(pub [u8; 32]); + +#[derive(Decode, Encode, Debug)] +pub struct PongMsg(pub [u8; 32]); + +pub struct RefreshService { + /// Routing table + table: Arc<Mutex<RoutingTable>>, + + /// Resolved listen endpoint + listen_endpoint: Option<RwLock<Endpoint>>, + + /// Managing spawned tasks. + task_group: TaskGroup, + + /// Holds the configuration for the P2P network. + config: Arc<Config>, + + /// Responsible for network and system monitoring. + monitor: Arc<Monitor>, +} + +impl RefreshService { + /// Creates a new refresh service + pub fn new( + config: Arc<Config>, + table: Arc<Mutex<RoutingTable>>, + monitor: Arc<Monitor>, + ) -> Self { + let listen_endpoint = config + .listen_endpoint + .as_ref() + .map(|endpoint| RwLock::new(endpoint.clone())); + + Self { + table, + listen_endpoint, + task_group: TaskGroup::new(), + config, + monitor, + } + } + + /// Start the refresh service + pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + if let Some(endpoint) = &self.listen_endpoint { + let endpoint = endpoint.read().await; + let addr = endpoint.addr()?; + let port = self.config.discovery_port; + + let selfc = self.clone(); + self.task_group.spawn( + ex.clone(), + selfc.listen_loop(addr.clone(), port), + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + error!("Listen loop stopped: {err}"); + } + }, + ); + } + + let selfc = self.clone(); + self.task_group.spawn( + ex.clone(), + selfc.refresh_loop(ex.clone()), + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + error!("Refresh loop stopped: {err}"); + } + }, + ); + + Ok(()) + } + + /// Set the resolved listen endpoint. + pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { + if let Some(endpoint) = &self.listen_endpoint { + *endpoint.write().await = resolved_endpoint.clone(); + } + } + + /// Shuts down the refresh service + pub async fn shutdown(&self) { + self.task_group.cancel().await; + } + + /// Initiates periodic refreshing of the routing table. This function will + /// select 8 random entries from each bucket in the routing table and start + /// sending Ping messages to the entries. + async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); + loop { + timer.next().await; + trace!("Start refreshing the routing table..."); + + self.monitor + .notify(&DiscoveryEvent::RefreshStarted.into()) + .await; + + let table = self.table.lock().await; + let mut entries: Vec<BucketEntry> = vec![]; + for bucket in table.iter() { + for entry in bucket.random_iter(8) { + entries.push(entry.clone()) + } + } + drop(table); + + self.clone().do_refresh(&entries, ex.clone()).await; + } + } + + /// Iterates over the entries and spawns a new task for each entry to + /// initiate a connection attempt to that entry. + async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) { + for chunk in entries.chunks(16) { + let mut tasks = Vec::new(); + for bucket_entry in chunk { + if bucket_entry.is_connected() { + continue; + } + + if bucket_entry.failures >= MAX_FAILURES { + self.table + .lock() + .await + .remove_entry(&bucket_entry.entry.key); + return; + } + + tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) + } + + for task in tasks { + task.await; + } + } + } + + /// Initiates refresh for a specific entry within the routing table. It + /// updates the routing table according to the result. + async fn refresh_entry(self: Arc<Self>, bucket_entry: BucketEntry) { + let key = &bucket_entry.entry.key; + match self.connect(&bucket_entry.entry).await { + Ok(_) => { + self.table.lock().await.update_entry(key, PENDING_ENTRY); + } + Err(err) => { + trace!("Failed to refresh entry {:?}: {err}", key); + let table = &mut self.table.lock().await; + if bucket_entry.failures >= MAX_FAILURES { + table.remove_entry(key); + return; + } + table.update_entry(key, UNREACHABLE_ENTRY); + } + } + } + + /// Initiates a UDP connection with the entry and attempts to send a Ping + /// message. If it fails, it retries according to the allowed retries + /// specified in the Config, with backoff between each retry. + async fn connect(&self, entry: &Entry) -> Result<()> { + let mut retry = 0; + let conn = dial_udp(&entry.addr, &entry.discovery_port).await?; + let backoff = Backoff::new(100, 5000); + while retry < self.config.refresh_connect_retries { + match self.send_ping_msg(&conn).await { + Ok(()) => return Ok(()), + Err(Error::KaryonsNet(NetError::Timeout)) => { + retry += 1; + backoff.sleep().await; + } + Err(err) => { + return Err(err); + } + } + } + + Err(NetError::Timeout.into()) + } + + /// Set up a UDP listener and start listening for Ping messages from other + /// peers. + async fn listen_loop(self: Arc<Self>, addr: Addr, port: Port) -> Result<()> { + let endpoint = Endpoint::Udp(addr.clone(), port); + let conn = match listen_udp(&addr, &port).await { + Ok(c) => { + self.monitor + .notify(&ConnEvent::Listening(endpoint.clone()).into()) + .await; + c + } + Err(err) => { + self.monitor + .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) + .await; + return Err(err.into()); + } + }; + info!("Start listening on {endpoint}"); + + loop { + let res = self.listen_to_ping_msg(&conn).await; + if let Err(err) = res { + trace!("Failed to handle ping msg {err}"); + self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + } + } + } + + /// Listen to receive a Ping message and respond with a Pong message. + async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> { + let mut buf = [0; PINGMSG_SIZE]; + let (_, endpoint) = conn.recv_from(&mut buf).await?; + + self.monitor + .notify(&ConnEvent::Accepted(endpoint.clone()).into()) + .await; + + let (ping_msg, _) = decode::<PingMsg>(&buf)?; + + let pong_msg = PongMsg(ping_msg.0); + let buffer = encode(&pong_msg)?; + + conn.send_to(&buffer, &endpoint).await?; + + self.monitor + .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) + .await; + Ok(()) + } + + /// Sends a Ping msg and wait to receive the Pong message. + async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> { + let mut nonce: [u8; 32] = [0; 32]; + RngCore::fill_bytes(&mut OsRng, &mut nonce); + + let ping_msg = PingMsg(nonce); + let buffer = encode(&ping_msg)?; + conn.send(&buffer).await?; + + let buf = &mut [0; PINGMSG_SIZE]; + let t = Duration::from_secs(self.config.refresh_response_timeout); + timeout(t, conn.recv(buf)).await??; + + let (pong_msg, _) = decode::<PongMsg>(buf)?; + + if ping_msg.0 != pong_msg.0 { + return Err(Error::InvalidPongMsg); + } + + Ok(()) + } +} |