aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs38
1 files changed, 19 insertions, 19 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 7582c84..a708261 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -12,7 +12,7 @@ use smol::{
use karyons_core::{
async_utils::{timeout, Backoff, TaskGroup, TaskResult},
utils::{decode, encode},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
@@ -43,7 +43,10 @@ pub struct RefreshService {
listen_endpoint: Option<RwLock<Endpoint>>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
+
+ /// A global executor
+ executor: GlobalExecutor,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -58,6 +61,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
+ executor: GlobalExecutor,
) -> Self {
let listen_endpoint = config
.listen_endpoint
@@ -67,41 +71,36 @@ impl RefreshService {
Self {
table,
listen_endpoint,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(executor.clone()),
+ executor,
config,
monitor,
}
}
/// Start the refresh service
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
let endpoint = endpoint.read().await;
let addr = endpoint.addr()?;
let port = self.config.discovery_port;
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(addr.clone(), port),
- |res| async move {
+ self.task_group
+ .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- },
- );
+ });
}
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.refresh_loop(ex.clone()),
- |res| async move {
+ self.task_group
+ .spawn(selfc.refresh_loop(), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- },
- );
+ });
Ok(())
}
@@ -121,7 +120,7 @@ impl RefreshService {
/// Initiates periodic refreshing of the routing table. This function will
/// select 8 random entries from each bucket in the routing table and start
/// sending Ping messages to the entries.
- async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn refresh_loop(self: Arc<Self>) -> Result<()> {
let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
timer.next().await;
@@ -140,13 +139,14 @@ impl RefreshService {
}
drop(table);
- self.clone().do_refresh(&entries, ex.clone()).await;
+ self.clone().do_refresh(&entries).await;
}
}
/// Iterates over the entries and spawns a new task for each entry to
/// initiate a connection attempt to that entry.
- async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) {
+ async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
+ let ex = &self.executor;
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {