diff options
author | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 |
commit | 849d827486c75b2ab223d7b0e638dbb5b74d4d1d (patch) | |
tree | 41cd3babc37147ec4a40cab8ce8ae31c91cce33b /karyons_p2p/src/discovery | |
parent | de1354525895ffbad18f90a5246fd65157f7449e (diff) |
rename crates
Diffstat (limited to 'karyons_p2p/src/discovery')
-rw-r--r-- | karyons_p2p/src/discovery/lookup.rs | 366 | ||||
-rw-r--r-- | karyons_p2p/src/discovery/mod.rs | 262 | ||||
-rw-r--r-- | karyons_p2p/src/discovery/refresh.rs | 289 |
3 files changed, 0 insertions, 917 deletions
diff --git a/karyons_p2p/src/discovery/lookup.rs b/karyons_p2p/src/discovery/lookup.rs deleted file mode 100644 index f404133..0000000 --- a/karyons_p2p/src/discovery/lookup.rs +++ /dev/null @@ -1,366 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use futures_util::{stream::FuturesUnordered, StreamExt}; -use log::{error, trace}; -use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; -use smol::lock::{Mutex, RwLock}; - -use karyons_core::{async_utils::timeout, utils::decode, Executor}; - -use karyons_net::{Conn, Endpoint}; - -use crate::{ - io_codec::IOCodec, - message::{ - get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, - ShutdownMsg, - }, - monitor::{ConnEvent, DiscoveryEvent, Monitor}, - net::{ConnectionSlots, Connector, Listener}, - routing_table::RoutingTable, - utils::version_match, - Config, Error, PeerID, Result, -}; - -/// Maximum number of peers that can be returned in a PeersMsg. -pub const MAX_PEERS_IN_PEERSMSG: usize = 10; - -pub struct LookupService { - /// Peer's ID - id: PeerID, - - /// Routing Table - table: Arc<Mutex<RoutingTable>>, - - /// Listener - listener: Arc<Listener>, - /// Connector - connector: Arc<Connector>, - - /// Outbound slots. - outbound_slots: Arc<ConnectionSlots>, - - /// Resolved listen endpoint - listen_endpoint: Option<RwLock<Endpoint>>, - - /// Holds the configuration for the P2P network. - config: Arc<Config>, - - /// Responsible for network and system monitoring. - monitor: Arc<Monitor>, -} - -impl LookupService { - /// Creates a new lookup service - pub fn new( - id: &PeerID, - table: Arc<Mutex<RoutingTable>>, - config: Arc<Config>, - monitor: Arc<Monitor>, - ) -> Self { - let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); - let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); - let connector = Connector::new( - config.lookup_connect_retries, - outbound_slots.clone(), - monitor.clone(), - ); - - let listen_endpoint = config - .listen_endpoint - .as_ref() - .map(|endpoint| RwLock::new(endpoint.clone())); - - Self { - id: id.clone(), - table, - listener, - connector, - outbound_slots, - listen_endpoint, - config, - monitor, - } - } - - /// Start the lookup service. - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { - self.start_listener(ex).await?; - Ok(()) - } - - /// Set the resolved listen endpoint. - pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { - if let Some(endpoint) = &self.listen_endpoint { - *endpoint.write().await = resolved_endpoint.clone(); - } - } - - /// Shuts down the lookup service. - pub async fn shutdown(&self) { - self.connector.shutdown().await; - self.listener.shutdown().await; - } - - /// Starts iterative lookup and populate the routing table. - /// - /// This method begins by generating a random peer ID and connecting to the - /// provided endpoint. It then sends a FindPeer message containing the - /// randomly generated peer ID. Upon receiving peers from the initial lookup, - /// it starts connecting to these received peers and sends them a FindPeer - /// message that contains our own peer ID. - pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { - trace!("Lookup started {endpoint}"); - self.monitor - .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) - .await; - - let mut random_peers = vec![]; - if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { - self.monitor - .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) - .await; - return Err(err); - }; - - let mut peer_buffer = vec![]; - self.self_lookup(&random_peers, &mut peer_buffer).await; - - while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG { - match random_peers.pop() { - Some(p) => peer_buffer.push(p), - None => break, - } - } - - for peer in peer_buffer.iter() { - let mut table = self.table.lock().await; - let result = table.add_entry(peer.clone().into()); - trace!("Add entry {:?}", result); - } - - self.monitor - .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) - .await; - - Ok(()) - } - - /// Starts a random lookup - /// - /// This will perfom lookup on a random generated PeerID - async fn random_lookup( - &self, - endpoint: &Endpoint, - random_peers: &mut Vec<PeerMsg>, - ) -> Result<()> { - for _ in 0..2 { - let peer_id = PeerID::random(); - let peers = self.connect(&peer_id, endpoint.clone()).await?; - for peer in peers { - if random_peers.contains(&peer) - || peer.peer_id == self.id - || self.table.lock().await.contains_key(&peer.peer_id.0) - { - continue; - } - - random_peers.push(peer); - } - } - - Ok(()) - } - - /// Starts a self lookup - async fn self_lookup(&self, random_peers: &Vec<PeerMsg>, peer_buffer: &mut Vec<PeerMsg>) { - let mut tasks = FuturesUnordered::new(); - for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { - let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); - tasks.push(self.connect(&self.id, endpoint)) - } - - while let Some(result) = tasks.next().await { - match result { - Ok(peers) => peer_buffer.extend(peers), - Err(err) => { - error!("Failed to do self lookup: {err}"); - } - } - } - } - - /// Connects to the given endpoint - async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> { - let conn = self.connector.connect(&endpoint).await?; - let io_codec = IOCodec::new(conn); - let result = self.handle_outbound(io_codec, peer_id).await; - - self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) - .await; - self.outbound_slots.remove().await; - - result - } - - /// Handles outbound connection - async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> { - trace!("Send Ping msg"); - self.send_ping_msg(&io_codec).await?; - - trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; - - if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { - return Err(Error::Lookup("Received too many peers in PeersMsg")); - } - - trace!("Send Peer msg"); - if let Some(endpoint) = &self.listen_endpoint { - self.send_peer_msg(&io_codec, endpoint.read().await.clone()) - .await?; - } - - trace!("Send Shutdown msg"); - self.send_shutdown_msg(&io_codec).await?; - - Ok(peers.0) - } - - /// Start a listener. - async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { - let addr = match &self.listen_endpoint { - Some(a) => a.read().await.addr()?.clone(), - None => return Ok(()), - }; - - let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); - - let selfc = self.clone(); - let callback = |conn: Conn| async move { - let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); - timeout(t, selfc.handle_inbound(conn)).await??; - Ok(()) - }; - - self.listener.start(ex, endpoint.clone(), callback).await?; - Ok(()) - } - - /// Handles inbound connection - async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> { - let io_codec = IOCodec::new(conn); - loop { - let msg: NetMsg = io_codec.read().await?; - trace!("Receive msg {:?}", msg.header.command); - - if let NetMsgCmd::Shutdown = msg.header.command { - return Ok(()); - } - - match &msg.header.command { - NetMsgCmd::Ping => { - let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?; - if !version_match(&self.config.version.req, &ping_msg.version) { - return Err(Error::IncompatibleVersion("system: {}".into())); - } - self.send_pong_msg(ping_msg.nonce, &io_codec).await?; - } - NetMsgCmd::FindPeer => { - let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?; - let peer_id = findpeer_msg.0; - self.send_peers_msg(&peer_id, &io_codec).await?; - } - NetMsgCmd::Peer => { - let (peer, _) = decode::<PeerMsg>(&msg.payload)?; - let result = self.table.lock().await.add_entry(peer.clone().into()); - trace!("Add entry result: {:?}", result); - } - c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), - } - } - } - - /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { - trace!("Send Pong msg"); - - let mut nonce: [u8; 32] = [0; 32]; - RngCore::fill_bytes(&mut OsRng, &mut nonce); - - let ping_msg = PingMsg { - version: self.config.version.v.clone(), - nonce, - }; - io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; - - let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; - - let payload = get_msg_payload!(Pong, recv_msg); - let (pong_msg, _) = decode::<PongMsg>(&payload)?; - - if ping_msg.nonce != pong_msg.0 { - return Err(Error::InvalidPongMsg); - } - - Ok(()) - } - - /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { - trace!("Send Pong msg"); - io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; - Ok(()) - } - - /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> { - trace!("Send FindPeer msg"); - io_codec - .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) - .await?; - - let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; - - let payload = get_msg_payload!(Peers, recv_msg); - let (peers, _) = decode(&payload)?; - - Ok(peers) - } - - /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { - trace!("Send Peers msg"); - let table = self.table.lock().await; - let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); - let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect(); - drop(table); - io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; - Ok(()) - } - - /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { - trace!("Send Peer msg"); - let peer_msg = PeerMsg { - addr: endpoint.addr()?.clone(), - port: *endpoint.port()?, - discovery_port: self.config.discovery_port, - peer_id: self.id.clone(), - }; - io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; - Ok(()) - } - - /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { - trace!("Send Shutdown msg"); - io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; - Ok(()) - } -} diff --git a/karyons_p2p/src/discovery/mod.rs b/karyons_p2p/src/discovery/mod.rs deleted file mode 100644 index 94b350b..0000000 --- a/karyons_p2p/src/discovery/mod.rs +++ /dev/null @@ -1,262 +0,0 @@ -mod lookup; -mod refresh; - -use std::sync::Arc; - -use log::{error, info}; -use rand::{rngs::OsRng, seq::SliceRandom}; -use smol::lock::Mutex; - -use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, -}; - -use karyons_net::{Conn, Endpoint}; - -use crate::{ - config::Config, - monitor::Monitor, - net::ConnQueue, - net::{ConnDirection, ConnectionSlots, Connector, Listener}, - routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, - }, - Error, PeerID, Result, -}; - -use lookup::LookupService; -use refresh::RefreshService; - -pub type ArcDiscovery = Arc<Discovery>; - -pub struct Discovery { - /// Routing table - table: Arc<Mutex<RoutingTable>>, - - /// Lookup Service - lookup_service: Arc<LookupService>, - - /// Refresh Service - refresh_service: Arc<RefreshService>, - - /// Connector - connector: Arc<Connector>, - /// Listener - listener: Arc<Listener>, - - /// Connection queue - conn_queue: Arc<ConnQueue>, - - /// Inbound slots. - pub(crate) inbound_slots: Arc<ConnectionSlots>, - /// Outbound slots. - pub(crate) outbound_slots: Arc<ConnectionSlots>, - - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Holds the configuration for the P2P network. - config: Arc<Config>, -} - -impl Discovery { - /// Creates a new Discovery - pub fn new( - peer_id: &PeerID, - conn_queue: Arc<ConnQueue>, - config: Arc<Config>, - monitor: Arc<Monitor>, - ) -> ArcDiscovery { - let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); - let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); - - let table_key = peer_id.0; - let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); - - let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); - let lookup_service = - LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); - - let connector = Connector::new( - config.max_connect_retries, - outbound_slots.clone(), - monitor.clone(), - ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); - - Arc::new(Self { - refresh_service: Arc::new(refresh_service), - lookup_service: Arc::new(lookup_service), - conn_queue, - table, - inbound_slots, - outbound_slots, - connector, - listener, - task_group: TaskGroup::new(), - config, - }) - } - - /// Start the Discovery - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { - // Check if the listen_endpoint is provided, and if so, start a listener. - if let Some(endpoint) = &self.config.listen_endpoint { - // Return an error if the discovery port is set to 0. - if self.config.discovery_port == 0 { - return Err(Error::Config( - "Please add a valid discovery port".to_string(), - )); - } - - let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; - - if endpoint.addr()? != resolved_endpoint.addr()? { - info!("Resolved listen endpoint: {resolved_endpoint}"); - self.lookup_service - .set_listen_endpoint(&resolved_endpoint) - .await; - self.refresh_service - .set_listen_endpoint(&resolved_endpoint) - .await; - } - } - - // Start the lookup service - self.lookup_service.start(ex.clone()).await?; - // Start the refresh service - self.refresh_service.start(ex.clone()).await?; - - // Attempt to manually connect to peer endpoints provided in the Config. - for endpoint in self.config.peer_endpoints.iter() { - let _ = self.connect(endpoint, None, ex.clone()).await; - } - - // Start connect loop - let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Connect loop stopped: {err}"); - } - }); - - Ok(()) - } - - /// Shuts down the discovery - pub async fn shutdown(&self) { - self.task_group.cancel().await; - self.connector.shutdown().await; - self.listener.shutdown().await; - - self.refresh_service.shutdown().await; - self.lookup_service.shutdown().await; - } - - /// Start a listener and on success, return the resolved endpoint. - async fn start_listener( - self: &Arc<Self>, - endpoint: &Endpoint, - ex: Executor<'_>, - ) -> Result<Endpoint> { - let selfc = self.clone(); - let callback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; - Ok(()) - }; - - let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; - Ok(resolved_endpoint) - } - - /// This method will attempt to connect to a peer in the routing table. - /// If the routing table is empty, it will start the seeding process for - /// finding new peers. - /// - /// This will perform a backoff to prevent getting stuck in the loop - /// if the seeding process couldn't find any peers. - async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { - let backoff = Backoff::new(500, self.config.seeding_interval * 1000); - loop { - let random_entry = self.random_entry(PENDING_ENTRY).await; - match random_entry { - Some(entry) => { - backoff.reset(); - let endpoint = Endpoint::Tcp(entry.addr, entry.port); - self.connect(&endpoint, Some(entry.key.into()), ex.clone()) - .await; - } - None => { - backoff.sleep().await; - self.start_seeding().await; - } - } - } - } - - /// Connect to the given endpoint using the connector - async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) { - let selfc = self.clone(); - let pid_cloned = pid.clone(); - let cback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - if let Some(pid) = pid_cloned { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; - } - Ok(()) - }; - - let res = self.connector.connect_with_cback(ex, endpoint, cback).await; - - if let Some(pid) = &pid { - match res { - Ok(_) => { - self.update_entry(pid, CONNECTED_ENTRY).await; - } - Err(_) => { - self.update_entry(pid, UNREACHABLE_ENTRY).await; - } - } - } - } - - /// Starts seeding process. - /// - /// This method randomly selects a peer from the routing table and - /// attempts to connect to that peer for the initial lookup. If the routing - /// table doesn't have an available entry, it will connect to one of the - /// provided bootstrap endpoints in the `Config` and initiate the lookup. - async fn start_seeding(&self) { - match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { - Some(entry) => { - let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); - if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { - self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; - error!("Failed to do lookup: {endpoint}: {err}"); - } - } - None => { - let peers = &self.config.bootstrap_peers; - for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { - if let Err(err) = self.lookup_service.start_lookup(endpoint).await { - error!("Failed to do lookup: {endpoint}: {err}"); - } - } - } - } - } - - /// Returns a random entry from routing table. - async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { - self.table.lock().await.random_entry(entry_flag).cloned() - } - - /// Update the entry status - async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { - let table = &mut self.table.lock().await; - table.update_entry(&pid.0, entry_flag); - } -} diff --git a/karyons_p2p/src/discovery/refresh.rs b/karyons_p2p/src/discovery/refresh.rs deleted file mode 100644 index 7582c84..0000000 --- a/karyons_p2p/src/discovery/refresh.rs +++ /dev/null @@ -1,289 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use bincode::{Decode, Encode}; -use log::{error, info, trace}; -use rand::{rngs::OsRng, RngCore}; -use smol::{ - lock::{Mutex, RwLock}, - stream::StreamExt, - Timer, -}; - -use karyons_core::{ - async_utils::{timeout, Backoff, TaskGroup, TaskResult}, - utils::{decode, encode}, - Executor, -}; - -use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; - -/// Maximum failures for an entry before removing it from the routing table. -pub const MAX_FAILURES: u32 = 3; - -/// Ping message size -const PINGMSG_SIZE: usize = 32; - -use crate::{ - monitor::{ConnEvent, DiscoveryEvent, Monitor}, - routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, - Config, Error, Result, -}; - -#[derive(Decode, Encode, Debug, Clone)] -pub struct PingMsg(pub [u8; 32]); - -#[derive(Decode, Encode, Debug)] -pub struct PongMsg(pub [u8; 32]); - -pub struct RefreshService { - /// Routing table - table: Arc<Mutex<RoutingTable>>, - - /// Resolved listen endpoint - listen_endpoint: Option<RwLock<Endpoint>>, - - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Holds the configuration for the P2P network. - config: Arc<Config>, - - /// Responsible for network and system monitoring. - monitor: Arc<Monitor>, -} - -impl RefreshService { - /// Creates a new refresh service - pub fn new( - config: Arc<Config>, - table: Arc<Mutex<RoutingTable>>, - monitor: Arc<Monitor>, - ) -> Self { - let listen_endpoint = config - .listen_endpoint - .as_ref() - .map(|endpoint| RwLock::new(endpoint.clone())); - - Self { - table, - listen_endpoint, - task_group: TaskGroup::new(), - config, - monitor, - } - } - - /// Start the refresh service - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { - if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await; - let addr = endpoint.addr()?; - let port = self.config.discovery_port; - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(addr.clone(), port), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {err}"); - } - }, - ); - } - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.refresh_loop(ex.clone()), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Refresh loop stopped: {err}"); - } - }, - ); - - Ok(()) - } - - /// Set the resolved listen endpoint. - pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { - if let Some(endpoint) = &self.listen_endpoint { - *endpoint.write().await = resolved_endpoint.clone(); - } - } - - /// Shuts down the refresh service - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - /// Initiates periodic refreshing of the routing table. This function will - /// select 8 random entries from each bucket in the routing table and start - /// sending Ping messages to the entries. - async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); - loop { - timer.next().await; - trace!("Start refreshing the routing table..."); - - self.monitor - .notify(&DiscoveryEvent::RefreshStarted.into()) - .await; - - let table = self.table.lock().await; - let mut entries: Vec<BucketEntry> = vec![]; - for bucket in table.iter() { - for entry in bucket.random_iter(8) { - entries.push(entry.clone()) - } - } - drop(table); - - self.clone().do_refresh(&entries, ex.clone()).await; - } - } - - /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt to that entry. - async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) { - for chunk in entries.chunks(16) { - let mut tasks = Vec::new(); - for bucket_entry in chunk { - if bucket_entry.is_connected() { - continue; - } - - if bucket_entry.failures >= MAX_FAILURES { - self.table - .lock() - .await - .remove_entry(&bucket_entry.entry.key); - return; - } - - tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) - } - - for task in tasks { - task.await; - } - } - } - - /// Initiates refresh for a specific entry within the routing table. It - /// updates the routing table according to the result. - async fn refresh_entry(self: Arc<Self>, bucket_entry: BucketEntry) { - let key = &bucket_entry.entry.key; - match self.connect(&bucket_entry.entry).await { - Ok(_) => { - self.table.lock().await.update_entry(key, PENDING_ENTRY); - } - Err(err) => { - trace!("Failed to refresh entry {:?}: {err}", key); - let table = &mut self.table.lock().await; - if bucket_entry.failures >= MAX_FAILURES { - table.remove_entry(key); - return; - } - table.update_entry(key, UNREACHABLE_ENTRY); - } - } - } - - /// Initiates a UDP connection with the entry and attempts to send a Ping - /// message. If it fails, it retries according to the allowed retries - /// specified in the Config, with backoff between each retry. - async fn connect(&self, entry: &Entry) -> Result<()> { - let mut retry = 0; - let conn = dial_udp(&entry.addr, &entry.discovery_port).await?; - let backoff = Backoff::new(100, 5000); - while retry < self.config.refresh_connect_retries { - match self.send_ping_msg(&conn).await { - Ok(()) => return Ok(()), - Err(Error::KaryonsNet(NetError::Timeout)) => { - retry += 1; - backoff.sleep().await; - } - Err(err) => { - return Err(err); - } - } - } - - Err(NetError::Timeout.into()) - } - - /// Set up a UDP listener and start listening for Ping messages from other - /// peers. - async fn listen_loop(self: Arc<Self>, addr: Addr, port: Port) -> Result<()> { - let endpoint = Endpoint::Udp(addr.clone(), port); - let conn = match listen_udp(&addr, &port).await { - Ok(c) => { - self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) - .await; - c - } - Err(err) => { - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) - .await; - return Err(err.into()); - } - }; - info!("Start listening on {endpoint}"); - - loop { - let res = self.listen_to_ping_msg(&conn).await; - if let Err(err) = res { - trace!("Failed to handle ping msg {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - } - } - } - - /// Listen to receive a Ping message and respond with a Pong message. - async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> { - let mut buf = [0; PINGMSG_SIZE]; - let (_, endpoint) = conn.recv_from(&mut buf).await?; - - self.monitor - .notify(&ConnEvent::Accepted(endpoint.clone()).into()) - .await; - - let (ping_msg, _) = decode::<PingMsg>(&buf)?; - - let pong_msg = PongMsg(ping_msg.0); - let buffer = encode(&pong_msg)?; - - conn.send_to(&buffer, &endpoint).await?; - - self.monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) - .await; - Ok(()) - } - - /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> { - let mut nonce: [u8; 32] = [0; 32]; - RngCore::fill_bytes(&mut OsRng, &mut nonce); - - let ping_msg = PingMsg(nonce); - let buffer = encode(&ping_msg)?; - conn.send(&buffer).await?; - - let buf = &mut [0; PINGMSG_SIZE]; - let t = Duration::from_secs(self.config.refresh_response_timeout); - timeout(t, conn.recv(buf)).await??; - - let (pong_msg, _) = decode::<PongMsg>(buf)?; - - if ping_msg.0 != pong_msg.0 { - return Err(Error::InvalidPongMsg); - } - - Ok(()) - } -} |