aboutsummaryrefslogtreecommitdiff
path: root/karyons_p2p/src/discovery
diff options
context:
space:
mode:
Diffstat (limited to 'karyons_p2p/src/discovery')
-rw-r--r--karyons_p2p/src/discovery/lookup.rs366
-rw-r--r--karyons_p2p/src/discovery/mod.rs262
-rw-r--r--karyons_p2p/src/discovery/refresh.rs289
3 files changed, 0 insertions, 917 deletions
diff --git a/karyons_p2p/src/discovery/lookup.rs b/karyons_p2p/src/discovery/lookup.rs
deleted file mode 100644
index f404133..0000000
--- a/karyons_p2p/src/discovery/lookup.rs
+++ /dev/null
@@ -1,366 +0,0 @@
-use std::{sync::Arc, time::Duration};
-
-use futures_util::{stream::FuturesUnordered, StreamExt};
-use log::{error, trace};
-use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
-use smol::lock::{Mutex, RwLock};
-
-use karyons_core::{async_utils::timeout, utils::decode, Executor};
-
-use karyons_net::{Conn, Endpoint};
-
-use crate::{
- io_codec::IOCodec,
- message::{
- get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg,
- ShutdownMsg,
- },
- monitor::{ConnEvent, DiscoveryEvent, Monitor},
- net::{ConnectionSlots, Connector, Listener},
- routing_table::RoutingTable,
- utils::version_match,
- Config, Error, PeerID, Result,
-};
-
-/// Maximum number of peers that can be returned in a PeersMsg.
-pub const MAX_PEERS_IN_PEERSMSG: usize = 10;
-
-pub struct LookupService {
- /// Peer's ID
- id: PeerID,
-
- /// Routing Table
- table: Arc<Mutex<RoutingTable>>,
-
- /// Listener
- listener: Arc<Listener>,
- /// Connector
- connector: Arc<Connector>,
-
- /// Outbound slots.
- outbound_slots: Arc<ConnectionSlots>,
-
- /// Resolved listen endpoint
- listen_endpoint: Option<RwLock<Endpoint>>,
-
- /// Holds the configuration for the P2P network.
- config: Arc<Config>,
-
- /// Responsible for network and system monitoring.
- monitor: Arc<Monitor>,
-}
-
-impl LookupService {
- /// Creates a new lookup service
- pub fn new(
- id: &PeerID,
- table: Arc<Mutex<RoutingTable>>,
- config: Arc<Config>,
- monitor: Arc<Monitor>,
- ) -> Self {
- let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
- let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
-
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
- let connector = Connector::new(
- config.lookup_connect_retries,
- outbound_slots.clone(),
- monitor.clone(),
- );
-
- let listen_endpoint = config
- .listen_endpoint
- .as_ref()
- .map(|endpoint| RwLock::new(endpoint.clone()));
-
- Self {
- id: id.clone(),
- table,
- listener,
- connector,
- outbound_slots,
- listen_endpoint,
- config,
- monitor,
- }
- }
-
- /// Start the lookup service.
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- self.start_listener(ex).await?;
- Ok(())
- }
-
- /// Set the resolved listen endpoint.
- pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) {
- if let Some(endpoint) = &self.listen_endpoint {
- *endpoint.write().await = resolved_endpoint.clone();
- }
- }
-
- /// Shuts down the lookup service.
- pub async fn shutdown(&self) {
- self.connector.shutdown().await;
- self.listener.shutdown().await;
- }
-
- /// Starts iterative lookup and populate the routing table.
- ///
- /// This method begins by generating a random peer ID and connecting to the
- /// provided endpoint. It then sends a FindPeer message containing the
- /// randomly generated peer ID. Upon receiving peers from the initial lookup,
- /// it starts connecting to these received peers and sends them a FindPeer
- /// message that contains our own peer ID.
- pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> {
- trace!("Lookup started {endpoint}");
- self.monitor
- .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into())
- .await;
-
- let mut random_peers = vec![];
- if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await {
- self.monitor
- .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into())
- .await;
- return Err(err);
- };
-
- let mut peer_buffer = vec![];
- self.self_lookup(&random_peers, &mut peer_buffer).await;
-
- while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
- match random_peers.pop() {
- Some(p) => peer_buffer.push(p),
- None => break,
- }
- }
-
- for peer in peer_buffer.iter() {
- let mut table = self.table.lock().await;
- let result = table.add_entry(peer.clone().into());
- trace!("Add entry {:?}", result);
- }
-
- self.monitor
- .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into())
- .await;
-
- Ok(())
- }
-
- /// Starts a random lookup
- ///
- /// This will perfom lookup on a random generated PeerID
- async fn random_lookup(
- &self,
- endpoint: &Endpoint,
- random_peers: &mut Vec<PeerMsg>,
- ) -> Result<()> {
- for _ in 0..2 {
- let peer_id = PeerID::random();
- let peers = self.connect(&peer_id, endpoint.clone()).await?;
- for peer in peers {
- if random_peers.contains(&peer)
- || peer.peer_id == self.id
- || self.table.lock().await.contains_key(&peer.peer_id.0)
- {
- continue;
- }
-
- random_peers.push(peer);
- }
- }
-
- Ok(())
- }
-
- /// Starts a self lookup
- async fn self_lookup(&self, random_peers: &Vec<PeerMsg>, peer_buffer: &mut Vec<PeerMsg>) {
- let mut tasks = FuturesUnordered::new();
- for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
- let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
- tasks.push(self.connect(&self.id, endpoint))
- }
-
- while let Some(result) = tasks.next().await {
- match result {
- Ok(peers) => peer_buffer.extend(peers),
- Err(err) => {
- error!("Failed to do self lookup: {err}");
- }
- }
- }
- }
-
- /// Connects to the given endpoint
- async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> {
- let conn = self.connector.connect(&endpoint).await?;
- let io_codec = IOCodec::new(conn);
- let result = self.handle_outbound(io_codec, peer_id).await;
-
- self.monitor
- .notify(&ConnEvent::Disconnected(endpoint).into())
- .await;
- self.outbound_slots.remove().await;
-
- result
- }
-
- /// Handles outbound connection
- async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> {
- trace!("Send Ping msg");
- self.send_ping_msg(&io_codec).await?;
-
- trace!("Send FindPeer msg");
- let peers = self.send_findpeer_msg(&io_codec, peer_id).await?;
-
- if peers.0.len() >= MAX_PEERS_IN_PEERSMSG {
- return Err(Error::Lookup("Received too many peers in PeersMsg"));
- }
-
- trace!("Send Peer msg");
- if let Some(endpoint) = &self.listen_endpoint {
- self.send_peer_msg(&io_codec, endpoint.read().await.clone())
- .await?;
- }
-
- trace!("Send Shutdown msg");
- self.send_shutdown_msg(&io_codec).await?;
-
- Ok(peers.0)
- }
-
- /// Start a listener.
- async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- let addr = match &self.listen_endpoint {
- Some(a) => a.read().await.addr()?.clone(),
- None => return Ok(()),
- };
-
- let endpoint = Endpoint::Tcp(addr, self.config.discovery_port);
-
- let selfc = self.clone();
- let callback = |conn: Conn| async move {
- let t = Duration::from_secs(selfc.config.lookup_connection_lifespan);
- timeout(t, selfc.handle_inbound(conn)).await??;
- Ok(())
- };
-
- self.listener.start(ex, endpoint.clone(), callback).await?;
- Ok(())
- }
-
- /// Handles inbound connection
- async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> {
- let io_codec = IOCodec::new(conn);
- loop {
- let msg: NetMsg = io_codec.read().await?;
- trace!("Receive msg {:?}", msg.header.command);
-
- if let NetMsgCmd::Shutdown = msg.header.command {
- return Ok(());
- }
-
- match &msg.header.command {
- NetMsgCmd::Ping => {
- let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?;
- if !version_match(&self.config.version.req, &ping_msg.version) {
- return Err(Error::IncompatibleVersion("system: {}".into()));
- }
- self.send_pong_msg(ping_msg.nonce, &io_codec).await?;
- }
- NetMsgCmd::FindPeer => {
- let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
- let peer_id = findpeer_msg.0;
- self.send_peers_msg(&peer_id, &io_codec).await?;
- }
- NetMsgCmd::Peer => {
- let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
- let result = self.table.lock().await.add_entry(peer.clone().into());
- trace!("Add entry result: {:?}", result);
- }
- c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
- }
- }
- }
-
- /// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> {
- trace!("Send Pong msg");
-
- let mut nonce: [u8; 32] = [0; 32];
- RngCore::fill_bytes(&mut OsRng, &mut nonce);
-
- let ping_msg = PingMsg {
- version: self.config.version.v.clone(),
- nonce,
- };
- io_codec.write(NetMsgCmd::Ping, &ping_msg).await?;
-
- let t = Duration::from_secs(self.config.lookup_response_timeout);
- let recv_msg: NetMsg = io_codec.read_timeout(t).await?;
-
- let payload = get_msg_payload!(Pong, recv_msg);
- let (pong_msg, _) = decode::<PongMsg>(&payload)?;
-
- if ping_msg.nonce != pong_msg.0 {
- return Err(Error::InvalidPongMsg);
- }
-
- Ok(())
- }
-
- /// Sends a Pong msg
- async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> {
- trace!("Send Pong msg");
- io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?;
- Ok(())
- }
-
- /// Sends a FindPeer msg and wait to receivet the Peers msg.
- async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> {
- trace!("Send FindPeer msg");
- io_codec
- .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone()))
- .await?;
-
- let t = Duration::from_secs(self.config.lookup_response_timeout);
- let recv_msg: NetMsg = io_codec.read_timeout(t).await?;
-
- let payload = get_msg_payload!(Peers, recv_msg);
- let (peers, _) = decode(&payload)?;
-
- Ok(peers)
- }
-
- /// Sends a Peers msg.
- async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> {
- trace!("Send Peers msg");
- let table = self.table.lock().await;
- let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
- let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
- drop(table);
- io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?;
- Ok(())
- }
-
- /// Sends a Peer msg.
- async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> {
- trace!("Send Peer msg");
- let peer_msg = PeerMsg {
- addr: endpoint.addr()?.clone(),
- port: *endpoint.port()?,
- discovery_port: self.config.discovery_port,
- peer_id: self.id.clone(),
- };
- io_codec.write(NetMsgCmd::Peer, &peer_msg).await?;
- Ok(())
- }
-
- /// Sends a Shutdown msg.
- async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> {
- trace!("Send Shutdown msg");
- io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?;
- Ok(())
- }
-}
diff --git a/karyons_p2p/src/discovery/mod.rs b/karyons_p2p/src/discovery/mod.rs
deleted file mode 100644
index 94b350b..0000000
--- a/karyons_p2p/src/discovery/mod.rs
+++ /dev/null
@@ -1,262 +0,0 @@
-mod lookup;
-mod refresh;
-
-use std::sync::Arc;
-
-use log::{error, info};
-use rand::{rngs::OsRng, seq::SliceRandom};
-use smol::lock::Mutex;
-
-use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
- Executor,
-};
-
-use karyons_net::{Conn, Endpoint};
-
-use crate::{
- config::Config,
- monitor::Monitor,
- net::ConnQueue,
- net::{ConnDirection, ConnectionSlots, Connector, Listener},
- routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
- UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
- },
- Error, PeerID, Result,
-};
-
-use lookup::LookupService;
-use refresh::RefreshService;
-
-pub type ArcDiscovery = Arc<Discovery>;
-
-pub struct Discovery {
- /// Routing table
- table: Arc<Mutex<RoutingTable>>,
-
- /// Lookup Service
- lookup_service: Arc<LookupService>,
-
- /// Refresh Service
- refresh_service: Arc<RefreshService>,
-
- /// Connector
- connector: Arc<Connector>,
- /// Listener
- listener: Arc<Listener>,
-
- /// Connection queue
- conn_queue: Arc<ConnQueue>,
-
- /// Inbound slots.
- pub(crate) inbound_slots: Arc<ConnectionSlots>,
- /// Outbound slots.
- pub(crate) outbound_slots: Arc<ConnectionSlots>,
-
- /// Managing spawned tasks.
- task_group: TaskGroup,
-
- /// Holds the configuration for the P2P network.
- config: Arc<Config>,
-}
-
-impl Discovery {
- /// Creates a new Discovery
- pub fn new(
- peer_id: &PeerID,
- conn_queue: Arc<ConnQueue>,
- config: Arc<Config>,
- monitor: Arc<Monitor>,
- ) -> ArcDiscovery {
- let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
- let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
-
- let table_key = peer_id.0;
- let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
-
- let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone());
- let lookup_service =
- LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone());
-
- let connector = Connector::new(
- config.max_connect_retries,
- outbound_slots.clone(),
- monitor.clone(),
- );
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
-
- Arc::new(Self {
- refresh_service: Arc::new(refresh_service),
- lookup_service: Arc::new(lookup_service),
- conn_queue,
- table,
- inbound_slots,
- outbound_slots,
- connector,
- listener,
- task_group: TaskGroup::new(),
- config,
- })
- }
-
- /// Start the Discovery
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- // Check if the listen_endpoint is provided, and if so, start a listener.
- if let Some(endpoint) = &self.config.listen_endpoint {
- // Return an error if the discovery port is set to 0.
- if self.config.discovery_port == 0 {
- return Err(Error::Config(
- "Please add a valid discovery port".to_string(),
- ));
- }
-
- let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?;
-
- if endpoint.addr()? != resolved_endpoint.addr()? {
- info!("Resolved listen endpoint: {resolved_endpoint}");
- self.lookup_service
- .set_listen_endpoint(&resolved_endpoint)
- .await;
- self.refresh_service
- .set_listen_endpoint(&resolved_endpoint)
- .await;
- }
- }
-
- // Start the lookup service
- self.lookup_service.start(ex.clone()).await?;
- // Start the refresh service
- self.refresh_service.start(ex.clone()).await?;
-
- // Attempt to manually connect to peer endpoints provided in the Config.
- for endpoint in self.config.peer_endpoints.iter() {
- let _ = self.connect(endpoint, None, ex.clone()).await;
- }
-
- // Start connect loop
- let selfc = self.clone();
- self.task_group
- .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Connect loop stopped: {err}");
- }
- });
-
- Ok(())
- }
-
- /// Shuts down the discovery
- pub async fn shutdown(&self) {
- self.task_group.cancel().await;
- self.connector.shutdown().await;
- self.listener.shutdown().await;
-
- self.refresh_service.shutdown().await;
- self.lookup_service.shutdown().await;
- }
-
- /// Start a listener and on success, return the resolved endpoint.
- async fn start_listener(
- self: &Arc<Self>,
- endpoint: &Endpoint,
- ex: Executor<'_>,
- ) -> Result<Endpoint> {
- let selfc = self.clone();
- let callback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
- Ok(())
- };
-
- let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?;
- Ok(resolved_endpoint)
- }
-
- /// This method will attempt to connect to a peer in the routing table.
- /// If the routing table is empty, it will start the seeding process for
- /// finding new peers.
- ///
- /// This will perform a backoff to prevent getting stuck in the loop
- /// if the seeding process couldn't find any peers.
- async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
- let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
- loop {
- let random_entry = self.random_entry(PENDING_ENTRY).await;
- match random_entry {
- Some(entry) => {
- backoff.reset();
- let endpoint = Endpoint::Tcp(entry.addr, entry.port);
- self.connect(&endpoint, Some(entry.key.into()), ex.clone())
- .await;
- }
- None => {
- backoff.sleep().await;
- self.start_seeding().await;
- }
- }
- }
- }
-
- /// Connect to the given endpoint using the connector
- async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) {
- let selfc = self.clone();
- let pid_cloned = pid.clone();
- let cback = |conn: Conn| async move {
- selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
- if let Some(pid) = pid_cloned {
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
- }
- Ok(())
- };
-
- let res = self.connector.connect_with_cback(ex, endpoint, cback).await;
-
- if let Some(pid) = &pid {
- match res {
- Ok(_) => {
- self.update_entry(pid, CONNECTED_ENTRY).await;
- }
- Err(_) => {
- self.update_entry(pid, UNREACHABLE_ENTRY).await;
- }
- }
- }
- }
-
- /// Starts seeding process.
- ///
- /// This method randomly selects a peer from the routing table and
- /// attempts to connect to that peer for the initial lookup. If the routing
- /// table doesn't have an available entry, it will connect to one of the
- /// provided bootstrap endpoints in the `Config` and initiate the lookup.
- async fn start_seeding(&self) {
- match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
- Some(entry) => {
- let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
- if let Err(err) = self.lookup_service.start_lookup(&endpoint).await {
- self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
- error!("Failed to do lookup: {endpoint}: {err}");
- }
- }
- None => {
- let peers = &self.config.bootstrap_peers;
- for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
- if let Err(err) = self.lookup_service.start_lookup(endpoint).await {
- error!("Failed to do lookup: {endpoint}: {err}");
- }
- }
- }
- }
- }
-
- /// Returns a random entry from routing table.
- async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
- self.table.lock().await.random_entry(entry_flag).cloned()
- }
-
- /// Update the entry status
- async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
- let table = &mut self.table.lock().await;
- table.update_entry(&pid.0, entry_flag);
- }
-}
diff --git a/karyons_p2p/src/discovery/refresh.rs b/karyons_p2p/src/discovery/refresh.rs
deleted file mode 100644
index 7582c84..0000000
--- a/karyons_p2p/src/discovery/refresh.rs
+++ /dev/null
@@ -1,289 +0,0 @@
-use std::{sync::Arc, time::Duration};
-
-use bincode::{Decode, Encode};
-use log::{error, info, trace};
-use rand::{rngs::OsRng, RngCore};
-use smol::{
- lock::{Mutex, RwLock},
- stream::StreamExt,
- Timer,
-};
-
-use karyons_core::{
- async_utils::{timeout, Backoff, TaskGroup, TaskResult},
- utils::{decode, encode},
- Executor,
-};
-
-use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
-
-/// Maximum failures for an entry before removing it from the routing table.
-pub const MAX_FAILURES: u32 = 3;
-
-/// Ping message size
-const PINGMSG_SIZE: usize = 32;
-
-use crate::{
- monitor::{ConnEvent, DiscoveryEvent, Monitor},
- routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY},
- Config, Error, Result,
-};
-
-#[derive(Decode, Encode, Debug, Clone)]
-pub struct PingMsg(pub [u8; 32]);
-
-#[derive(Decode, Encode, Debug)]
-pub struct PongMsg(pub [u8; 32]);
-
-pub struct RefreshService {
- /// Routing table
- table: Arc<Mutex<RoutingTable>>,
-
- /// Resolved listen endpoint
- listen_endpoint: Option<RwLock<Endpoint>>,
-
- /// Managing spawned tasks.
- task_group: TaskGroup,
-
- /// Holds the configuration for the P2P network.
- config: Arc<Config>,
-
- /// Responsible for network and system monitoring.
- monitor: Arc<Monitor>,
-}
-
-impl RefreshService {
- /// Creates a new refresh service
- pub fn new(
- config: Arc<Config>,
- table: Arc<Mutex<RoutingTable>>,
- monitor: Arc<Monitor>,
- ) -> Self {
- let listen_endpoint = config
- .listen_endpoint
- .as_ref()
- .map(|endpoint| RwLock::new(endpoint.clone()));
-
- Self {
- table,
- listen_endpoint,
- task_group: TaskGroup::new(),
- config,
- monitor,
- }
- }
-
- /// Start the refresh service
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await;
- let addr = endpoint.addr()?;
- let port = self.config.discovery_port;
-
- let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(addr.clone(), port),
- |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Listen loop stopped: {err}");
- }
- },
- );
- }
-
- let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.refresh_loop(ex.clone()),
- |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Refresh loop stopped: {err}");
- }
- },
- );
-
- Ok(())
- }
-
- /// Set the resolved listen endpoint.
- pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) {
- if let Some(endpoint) = &self.listen_endpoint {
- *endpoint.write().await = resolved_endpoint.clone();
- }
- }
-
- /// Shuts down the refresh service
- pub async fn shutdown(&self) {
- self.task_group.cancel().await;
- }
-
- /// Initiates periodic refreshing of the routing table. This function will
- /// select 8 random entries from each bucket in the routing table and start
- /// sending Ping messages to the entries.
- async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
- let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
- loop {
- timer.next().await;
- trace!("Start refreshing the routing table...");
-
- self.monitor
- .notify(&DiscoveryEvent::RefreshStarted.into())
- .await;
-
- let table = self.table.lock().await;
- let mut entries: Vec<BucketEntry> = vec![];
- for bucket in table.iter() {
- for entry in bucket.random_iter(8) {
- entries.push(entry.clone())
- }
- }
- drop(table);
-
- self.clone().do_refresh(&entries, ex.clone()).await;
- }
- }
-
- /// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt to that entry.
- async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) {
- for chunk in entries.chunks(16) {
- let mut tasks = Vec::new();
- for bucket_entry in chunk {
- if bucket_entry.is_connected() {
- continue;
- }
-
- if bucket_entry.failures >= MAX_FAILURES {
- self.table
- .lock()
- .await
- .remove_entry(&bucket_entry.entry.key);
- return;
- }
-
- tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))
- }
-
- for task in tasks {
- task.await;
- }
- }
- }
-
- /// Initiates refresh for a specific entry within the routing table. It
- /// updates the routing table according to the result.
- async fn refresh_entry(self: Arc<Self>, bucket_entry: BucketEntry) {
- let key = &bucket_entry.entry.key;
- match self.connect(&bucket_entry.entry).await {
- Ok(_) => {
- self.table.lock().await.update_entry(key, PENDING_ENTRY);
- }
- Err(err) => {
- trace!("Failed to refresh entry {:?}: {err}", key);
- let table = &mut self.table.lock().await;
- if bucket_entry.failures >= MAX_FAILURES {
- table.remove_entry(key);
- return;
- }
- table.update_entry(key, UNREACHABLE_ENTRY);
- }
- }
- }
-
- /// Initiates a UDP connection with the entry and attempts to send a Ping
- /// message. If it fails, it retries according to the allowed retries
- /// specified in the Config, with backoff between each retry.
- async fn connect(&self, entry: &Entry) -> Result<()> {
- let mut retry = 0;
- let conn = dial_udp(&entry.addr, &entry.discovery_port).await?;
- let backoff = Backoff::new(100, 5000);
- while retry < self.config.refresh_connect_retries {
- match self.send_ping_msg(&conn).await {
- Ok(()) => return Ok(()),
- Err(Error::KaryonsNet(NetError::Timeout)) => {
- retry += 1;
- backoff.sleep().await;
- }
- Err(err) => {
- return Err(err);
- }
- }
- }
-
- Err(NetError::Timeout.into())
- }
-
- /// Set up a UDP listener and start listening for Ping messages from other
- /// peers.
- async fn listen_loop(self: Arc<Self>, addr: Addr, port: Port) -> Result<()> {
- let endpoint = Endpoint::Udp(addr.clone(), port);
- let conn = match listen_udp(&addr, &port).await {
- Ok(c) => {
- self.monitor
- .notify(&ConnEvent::Listening(endpoint.clone()).into())
- .await;
- c
- }
- Err(err) => {
- self.monitor
- .notify(&ConnEvent::ListenFailed(endpoint.clone()).into())
- .await;
- return Err(err.into());
- }
- };
- info!("Start listening on {endpoint}");
-
- loop {
- let res = self.listen_to_ping_msg(&conn).await;
- if let Err(err) = res {
- trace!("Failed to handle ping msg {err}");
- self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
- }
- }
- }
-
- /// Listen to receive a Ping message and respond with a Pong message.
- async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> {
- let mut buf = [0; PINGMSG_SIZE];
- let (_, endpoint) = conn.recv_from(&mut buf).await?;
-
- self.monitor
- .notify(&ConnEvent::Accepted(endpoint.clone()).into())
- .await;
-
- let (ping_msg, _) = decode::<PingMsg>(&buf)?;
-
- let pong_msg = PongMsg(ping_msg.0);
- let buffer = encode(&pong_msg)?;
-
- conn.send_to(&buffer, &endpoint).await?;
-
- self.monitor
- .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
- .await;
- Ok(())
- }
-
- /// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> {
- let mut nonce: [u8; 32] = [0; 32];
- RngCore::fill_bytes(&mut OsRng, &mut nonce);
-
- let ping_msg = PingMsg(nonce);
- let buffer = encode(&ping_msg)?;
- conn.send(&buffer).await?;
-
- let buf = &mut [0; PINGMSG_SIZE];
- let t = Duration::from_secs(self.config.refresh_response_timeout);
- timeout(t, conn.recv(buf)).await??;
-
- let (pong_msg, _) = decode::<PongMsg>(buf)?;
-
- if ping_msg.0 != pong_msg.0 {
- return Err(Error::InvalidPongMsg);
- }
-
- Ok(())
- }
-}