aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r--p2p/src/discovery/mod.rs262
1 files changed, 262 insertions, 0 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
new file mode 100644
index 0000000..94b350b
--- /dev/null
+++ b/p2p/src/discovery/mod.rs
@@ -0,0 +1,262 @@
+mod lookup;
+mod refresh;
+
+use std::sync::Arc;
+
+use log::{error, info};
+use rand::{rngs::OsRng, seq::SliceRandom};
+use smol::lock::Mutex;
+
+use karyons_core::{
+ async_utils::{Backoff, TaskGroup, TaskResult},
+ Executor,
+};
+
+use karyons_net::{Conn, Endpoint};
+
+use crate::{
+ config::Config,
+ monitor::Monitor,
+ net::ConnQueue,
+ net::{ConnDirection, ConnectionSlots, Connector, Listener},
+ routing_table::{
+ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
+ UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ },
+ Error, PeerID, Result,
+};
+
+use lookup::LookupService;
+use refresh::RefreshService;
+
+pub type ArcDiscovery = Arc<Discovery>;
+
+pub struct Discovery {
+ /// Routing table
+ table: Arc<Mutex<RoutingTable>>,
+
+ /// Lookup Service
+ lookup_service: Arc<LookupService>,
+
+ /// Refresh Service
+ refresh_service: Arc<RefreshService>,
+
+ /// Connector
+ connector: Arc<Connector>,
+ /// Listener
+ listener: Arc<Listener>,
+
+ /// Connection queue
+ conn_queue: Arc<ConnQueue>,
+
+ /// Inbound slots.
+ pub(crate) inbound_slots: Arc<ConnectionSlots>,
+ /// Outbound slots.
+ pub(crate) outbound_slots: Arc<ConnectionSlots>,
+
+ /// Managing spawned tasks.
+ task_group: TaskGroup,
+
+ /// Holds the configuration for the P2P network.
+ config: Arc<Config>,
+}
+
+impl Discovery {
+ /// Creates a new Discovery
+ pub fn new(
+ peer_id: &PeerID,
+ conn_queue: Arc<ConnQueue>,
+ config: Arc<Config>,
+ monitor: Arc<Monitor>,
+ ) -> ArcDiscovery {
+ let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
+ let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
+
+ let table_key = peer_id.0;
+ let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
+
+ let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone());
+ let lookup_service =
+ LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone());
+
+ let connector = Connector::new(
+ config.max_connect_retries,
+ outbound_slots.clone(),
+ monitor.clone(),
+ );
+ let listener = Listener::new(inbound_slots.clone(), monitor.clone());
+
+ Arc::new(Self {
+ refresh_service: Arc::new(refresh_service),
+ lookup_service: Arc::new(lookup_service),
+ conn_queue,
+ table,
+ inbound_slots,
+ outbound_slots,
+ connector,
+ listener,
+ task_group: TaskGroup::new(),
+ config,
+ })
+ }
+
+ /// Start the Discovery
+ pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ // Check if the listen_endpoint is provided, and if so, start a listener.
+ if let Some(endpoint) = &self.config.listen_endpoint {
+ // Return an error if the discovery port is set to 0.
+ if self.config.discovery_port == 0 {
+ return Err(Error::Config(
+ "Please add a valid discovery port".to_string(),
+ ));
+ }
+
+ let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?;
+
+ if endpoint.addr()? != resolved_endpoint.addr()? {
+ info!("Resolved listen endpoint: {resolved_endpoint}");
+ self.lookup_service
+ .set_listen_endpoint(&resolved_endpoint)
+ .await;
+ self.refresh_service
+ .set_listen_endpoint(&resolved_endpoint)
+ .await;
+ }
+ }
+
+ // Start the lookup service
+ self.lookup_service.start(ex.clone()).await?;
+ // Start the refresh service
+ self.refresh_service.start(ex.clone()).await?;
+
+ // Attempt to manually connect to peer endpoints provided in the Config.
+ for endpoint in self.config.peer_endpoints.iter() {
+ let _ = self.connect(endpoint, None, ex.clone()).await;
+ }
+
+ // Start connect loop
+ let selfc = self.clone();
+ self.task_group
+ .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ error!("Connect loop stopped: {err}");
+ }
+ });
+
+ Ok(())
+ }
+
+ /// Shuts down the discovery
+ pub async fn shutdown(&self) {
+ self.task_group.cancel().await;
+ self.connector.shutdown().await;
+ self.listener.shutdown().await;
+
+ self.refresh_service.shutdown().await;
+ self.lookup_service.shutdown().await;
+ }
+
+ /// Start a listener and on success, return the resolved endpoint.
+ async fn start_listener(
+ self: &Arc<Self>,
+ endpoint: &Endpoint,
+ ex: Executor<'_>,
+ ) -> Result<Endpoint> {
+ let selfc = self.clone();
+ let callback = |conn: Conn| async move {
+ selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
+ Ok(())
+ };
+
+ let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?;
+ Ok(resolved_endpoint)
+ }
+
+ /// This method will attempt to connect to a peer in the routing table.
+ /// If the routing table is empty, it will start the seeding process for
+ /// finding new peers.
+ ///
+ /// This will perform a backoff to prevent getting stuck in the loop
+ /// if the seeding process couldn't find any peers.
+ async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
+ loop {
+ let random_entry = self.random_entry(PENDING_ENTRY).await;
+ match random_entry {
+ Some(entry) => {
+ backoff.reset();
+ let endpoint = Endpoint::Tcp(entry.addr, entry.port);
+ self.connect(&endpoint, Some(entry.key.into()), ex.clone())
+ .await;
+ }
+ None => {
+ backoff.sleep().await;
+ self.start_seeding().await;
+ }
+ }
+ }
+ }
+
+ /// Connect to the given endpoint using the connector
+ async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) {
+ let selfc = self.clone();
+ let pid_cloned = pid.clone();
+ let cback = |conn: Conn| async move {
+ selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
+ if let Some(pid) = pid_cloned {
+ selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ }
+ Ok(())
+ };
+
+ let res = self.connector.connect_with_cback(ex, endpoint, cback).await;
+
+ if let Some(pid) = &pid {
+ match res {
+ Ok(_) => {
+ self.update_entry(pid, CONNECTED_ENTRY).await;
+ }
+ Err(_) => {
+ self.update_entry(pid, UNREACHABLE_ENTRY).await;
+ }
+ }
+ }
+ }
+
+ /// Starts seeding process.
+ ///
+ /// This method randomly selects a peer from the routing table and
+ /// attempts to connect to that peer for the initial lookup. If the routing
+ /// table doesn't have an available entry, it will connect to one of the
+ /// provided bootstrap endpoints in the `Config` and initiate the lookup.
+ async fn start_seeding(&self) {
+ match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
+ Some(entry) => {
+ let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
+ if let Err(err) = self.lookup_service.start_lookup(&endpoint).await {
+ self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
+ error!("Failed to do lookup: {endpoint}: {err}");
+ }
+ }
+ None => {
+ let peers = &self.config.bootstrap_peers;
+ for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
+ if let Err(err) = self.lookup_service.start_lookup(endpoint).await {
+ error!("Failed to do lookup: {endpoint}: {err}");
+ }
+ }
+ }
+ }
+ }
+
+ /// Returns a random entry from routing table.
+ async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
+ self.table.lock().await.random_entry(entry_flag).cloned()
+ }
+
+ /// Update the entry status
+ async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
+ let table = &mut self.table.lock().await;
+ table.update_entry(&pid.0, entry_flag);
+ }
+}