aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r--p2p/src/discovery/mod.rs45
1 files changed, 26 insertions, 19 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 3e437aa..19ae77a 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -5,10 +5,10 @@ use std::sync::Arc;
use log::{error, info};
use rand::{rngs::OsRng, seq::SliceRandom};
-use smol::lock::Mutex;
use karyon_core::{
- async_util::{Backoff, Executor, TaskGroup, TaskResult},
+ async_runtime::{lock::Mutex, Executor},
+ async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
@@ -19,6 +19,7 @@ use crate::{
connection::{ConnDirection, ConnQueue},
connector::Connector,
listener::Listener,
+ message::NetMsg,
monitor::Monitor,
routing_table::{
Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
@@ -45,6 +46,7 @@ pub struct Discovery {
/// Connector
connector: Arc<Connector>,
+
/// Listener
listener: Arc<Listener>,
@@ -53,11 +55,12 @@ pub struct Discovery {
/// Inbound slots.
pub(crate) inbound_slots: Arc<ConnectionSlots>,
+
/// Outbound slots.
pub(crate) outbound_slots: Arc<ConnectionSlots>,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -71,7 +74,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: Executor<'static>,
+ ex: Executor,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
@@ -180,7 +183,7 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
- let callback = |c: Conn| async move {
+ let callback = |c: Conn<NetMsg>| async move {
selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
Ok(())
};
@@ -198,8 +201,8 @@ impl Discovery {
async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
- let random_entry = self.random_entry(PENDING_ENTRY).await;
- match random_entry {
+ let random_table_entry = self.random_table_entry(PENDING_ENTRY).await;
+ match random_table_entry {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
@@ -218,7 +221,7 @@ impl Discovery {
let selfc = self.clone();
let pid_c = pid.clone();
let endpoint_c = endpoint.clone();
- let cback = |conn: Conn| async move {
+ let cback = |conn: Conn<NetMsg>| async move {
let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
// If the entry is not in the routing table, ignore the result
@@ -230,17 +233,17 @@ impl Discovery {
match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
- // TODO: Use the appropriate status.
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ // TODO: Use an appropriate status.
+ selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
- selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
+ selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await;
}
Ok(_) => {
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
}
}
@@ -255,10 +258,10 @@ impl Discovery {
if let Some(pid) = &pid {
match result {
Ok(_) => {
- self.update_entry(pid, CONNECTED_ENTRY).await;
+ self.update_table_entry(pid, CONNECTED_ENTRY).await;
}
Err(_) => {
- self.update_entry(pid, UNREACHABLE_ENTRY).await;
+ self.update_table_entry(pid, UNREACHABLE_ENTRY).await;
}
}
}
@@ -271,12 +274,16 @@ impl Discovery {
/// table doesn't have an available entry, it will connect to one of the
/// provided bootstrap endpoints in the `Config` and initiate the lookup.
async fn start_seeding(&self) {
- match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
+ match self
+ .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY)
+ .await
+ {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
let peer_id = Some(entry.key.into());
if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
- self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
+ self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY)
+ .await;
error!("Failed to do lookup: {endpoint}: {err}");
}
}
@@ -292,12 +299,12 @@ impl Discovery {
}
/// Returns a random entry from routing table.
- async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
+ async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
self.table.lock().await.random_entry(entry_flag).cloned()
}
/// Update the entry status
- async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
+ async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
let table = &mut self.table.lock().await;
table.update_entry(&pid.0, entry_flag);
}