aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/lookup.rs15
-rw-r--r--p2p/src/discovery/mod.rs51
-rw-r--r--p2p/src/discovery/refresh.rs38
3 files changed, 55 insertions, 49 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 94da900..52aa339 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,7 +5,7 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyons_core::{async_utils::timeout, utils::decode, Executor};
+use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor};
use karyons_net::{Conn, Endpoint};
@@ -59,15 +59,18 @@ impl LookupService {
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
) -> Self {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
+ let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
+
let connector = Connector::new(
config.lookup_connect_retries,
outbound_slots.clone(),
monitor.clone(),
+ ex,
);
let listen_endpoint = config
@@ -88,8 +91,8 @@ impl LookupService {
}
/// Start the lookup service.
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- self.start_listener(ex).await?;
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
+ self.start_listener().await?;
Ok(())
}
@@ -233,7 +236,7 @@ impl LookupService {
}
/// Start a listener.
- async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn start_listener(self: &Arc<Self>) -> Result<()> {
let addr = match &self.listen_endpoint {
Some(a) => a.read().await.addr()?.clone(),
None => return Ok(()),
@@ -248,7 +251,7 @@ impl LookupService {
Ok(())
};
- self.listener.start(ex, endpoint.clone(), callback).await?;
+ self.listener.start(endpoint.clone(), callback).await?;
Ok(())
}
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7b8e7dc..7d37eec 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -9,7 +9,7 @@ use smol::lock::Mutex;
use karyons_core::{
async_utils::{Backoff, TaskGroup, TaskResult},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{Conn, Endpoint};
@@ -57,7 +57,7 @@ pub struct Discovery {
pub(crate) outbound_slots: Arc<ConnectionSlots>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -70,6 +70,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
@@ -77,16 +78,23 @@ impl Discovery {
let table_key = peer_id.0;
let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
- let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone());
- let lookup_service =
- LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone());
+ let refresh_service =
+ RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
+ let lookup_service = LookupService::new(
+ peer_id,
+ table.clone(),
+ config.clone(),
+ monitor.clone(),
+ ex.clone(),
+ );
let connector = Connector::new(
config.max_connect_retries,
outbound_slots.clone(),
monitor.clone(),
+ ex.clone(),
);
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
+ let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
Arc::new(Self {
refresh_service: Arc::new(refresh_service),
@@ -97,13 +105,13 @@ impl Discovery {
outbound_slots,
connector,
listener,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
config,
})
}
/// Start the Discovery
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
// Check if the listen_endpoint is provided, and if so, start a listener.
if let Some(endpoint) = &self.config.listen_endpoint {
// Return an error if the discovery port is set to 0.
@@ -113,7 +121,7 @@ impl Discovery {
));
}
- let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?;
+ let resolved_endpoint = self.start_listener(endpoint).await?;
if endpoint.addr()? != resolved_endpoint.addr()? {
info!("Resolved listen endpoint: {resolved_endpoint}");
@@ -127,19 +135,19 @@ impl Discovery {
}
// Start the lookup service
- self.lookup_service.start(ex.clone()).await?;
+ self.lookup_service.start().await?;
// Start the refresh service
- self.refresh_service.start(ex.clone()).await?;
+ self.refresh_service.start().await?;
// Attempt to manually connect to peer endpoints provided in the Config.
for endpoint in self.config.peer_endpoints.iter() {
- let _ = self.connect(endpoint, None, ex.clone()).await;
+ let _ = self.connect(endpoint, None).await;
}
// Start connect loop
let selfc = self.clone();
self.task_group
- .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move {
+ .spawn(selfc.connect_loop(), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
@@ -159,18 +167,14 @@ impl Discovery {
}
/// Start a listener and on success, return the resolved endpoint.
- async fn start_listener(
- self: &Arc<Self>,
- endpoint: &Endpoint,
- ex: Executor<'_>,
- ) -> Result<Endpoint> {
+ async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
let callback = |conn: Conn| async move {
selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
Ok(())
};
- let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?;
+ let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
Ok(resolved_endpoint)
}
@@ -180,7 +184,7 @@ impl Discovery {
///
/// This will perform a backoff to prevent getting stuck in the loop
/// if the seeding process couldn't find any peers.
- async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
let random_entry = self.random_entry(PENDING_ENTRY).await;
@@ -188,8 +192,7 @@ impl Discovery {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
- self.connect(&endpoint, Some(entry.key.into()), ex.clone())
- .await;
+ self.connect(&endpoint, Some(entry.key.into())).await;
}
None => {
backoff.sleep().await;
@@ -200,7 +203,7 @@ impl Discovery {
}
/// Connect to the given endpoint using the connector
- async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) {
+ async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
let selfc = self.clone();
let pid_cloned = pid.clone();
let cback = |conn: Conn| async move {
@@ -211,7 +214,7 @@ impl Discovery {
Ok(())
};
- let res = self.connector.connect_with_cback(ex, endpoint, cback).await;
+ let res = self.connector.connect_with_cback(endpoint, cback).await;
if let Some(pid) = &pid {
match res {
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 7582c84..a708261 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -12,7 +12,7 @@ use smol::{
use karyons_core::{
async_utils::{timeout, Backoff, TaskGroup, TaskResult},
utils::{decode, encode},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
@@ -43,7 +43,10 @@ pub struct RefreshService {
listen_endpoint: Option<RwLock<Endpoint>>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
+
+ /// A global executor
+ executor: GlobalExecutor,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -58,6 +61,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
+ executor: GlobalExecutor,
) -> Self {
let listen_endpoint = config
.listen_endpoint
@@ -67,41 +71,36 @@ impl RefreshService {
Self {
table,
listen_endpoint,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(executor.clone()),
+ executor,
config,
monitor,
}
}
/// Start the refresh service
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
let endpoint = endpoint.read().await;
let addr = endpoint.addr()?;
let port = self.config.discovery_port;
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(addr.clone(), port),
- |res| async move {
+ self.task_group
+ .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- },
- );
+ });
}
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.refresh_loop(ex.clone()),
- |res| async move {
+ self.task_group
+ .spawn(selfc.refresh_loop(), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- },
- );
+ });
Ok(())
}
@@ -121,7 +120,7 @@ impl RefreshService {
/// Initiates periodic refreshing of the routing table. This function will
/// select 8 random entries from each bucket in the routing table and start
/// sending Ping messages to the entries.
- async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn refresh_loop(self: Arc<Self>) -> Result<()> {
let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
timer.next().await;
@@ -140,13 +139,14 @@ impl RefreshService {
}
drop(table);
- self.clone().do_refresh(&entries, ex.clone()).await;
+ self.clone().do_refresh(&entries).await;
}
}
/// Iterates over the entries and spawns a new task for each entry to
/// initiate a connection attempt to that entry.
- async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) {
+ async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
+ let ex = &self.executor;
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {