aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/lookup.rs12
-rw-r--r--p2p/src/discovery/mod.rs70
-rw-r--r--p2p/src/discovery/refresh.rs45
3 files changed, 69 insertions, 58 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 8e06eef..9ddf614 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -283,11 +283,13 @@ impl LookupService {
let endpoint = Endpoint::Tcp(addr, self.config.discovery_port);
- let selfc = self.clone();
- let callback = |conn: Conn<NetMsg>| async move {
- let t = Duration::from_secs(selfc.config.lookup_connection_lifespan);
- timeout(t, selfc.handle_inbound(conn)).await??;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |conn: Conn<NetMsg>| async move {
+ let t = Duration::from_secs(this.config.lookup_connection_lifespan);
+ timeout(t, this.handle_inbound(conn)).await??;
+ Ok(())
+ }
};
self.listener.start(endpoint.clone(), callback).await?;
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index dae4d3f..a9d99d6 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -154,13 +154,17 @@ impl Discovery {
}
// Start connect loop
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.connect_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.connect_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -177,10 +181,12 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
- let selfc = self.clone();
- let callback = |c: Conn<NetMsg>| async move {
- selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |c: Conn<NetMsg>| async move {
+ this.conn_queue.handle(c, ConnDirection::Inbound).await?;
+ Ok(())
+ }
};
let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
@@ -212,31 +218,33 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
- let selfc = self.clone();
- let pid_c = pid.clone();
- let endpoint_c = endpoint.clone();
- let cback = |conn: Conn<NetMsg>| async move {
- let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
-
- // If the entry is not in the routing table, ignore the result
- let pid = match pid_c {
- Some(p) => p,
- None => return Ok(()),
- };
-
- match result {
- Err(Error::IncompatiblePeer) => {
- error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
- }
- Err(Error::PeerAlreadyConnected) => {
- selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ let cback = {
+ let this = self.clone();
+ let endpoint = endpoint.clone();
+ let pid = pid.clone();
+ |conn: Conn<NetMsg>| async move {
+ let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint} incompatible peer");
+ this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ this.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ }
+ Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
- Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
- Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
- }
- Ok(())
+ Ok(())
+ }
};
let result = self
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index c1d222b..b4f5396 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -38,9 +38,6 @@ pub struct RefreshService {
/// Managing spawned tasks.
task_group: TaskGroup,
- /// A global executor
- executor: Executor,
-
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -65,7 +62,6 @@ impl RefreshService {
table,
listen_endpoint,
task_group: TaskGroup::with_executor(executor.clone()),
- executor,
config,
monitor,
}
@@ -74,24 +70,32 @@ impl RefreshService {
/// Start the refresh service
pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await;
+ let endpoint = endpoint.read().await.clone();
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(endpoint).await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- });
+ },
+ );
}
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.refresh_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.refresh_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -133,25 +137,22 @@ impl RefreshService {
}
}
- /// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt.
+ /// Iterates over the entries and initiates a connection.
async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
- let ex = &self.executor;
- // Enforce a maximum of 16 concurrent connections.
+ use futures_util::stream::{FuturesUnordered, StreamExt};
+ // Enforce a maximum of 16 connections.
for chunk in entries.chunks(16) {
- let mut tasks = Vec::new();
+ let mut tasks = FuturesUnordered::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
- tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))
+ tasks.push(self.clone().refresh_entry(bucket_entry.clone()))
}
- for task in tasks {
- let _ = task.await;
- }
+ while tasks.next().await.is_some() {}
}
}