aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r--p2p/src/discovery/mod.rs70
1 files changed, 39 insertions, 31 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index dae4d3f..a9d99d6 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -154,13 +154,17 @@ impl Discovery {
}
// Start connect loop
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.connect_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.connect_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -177,10 +181,12 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
- let selfc = self.clone();
- let callback = |c: Conn<NetMsg>| async move {
- selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |c: Conn<NetMsg>| async move {
+ this.conn_queue.handle(c, ConnDirection::Inbound).await?;
+ Ok(())
+ }
};
let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
@@ -212,31 +218,33 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
- let selfc = self.clone();
- let pid_c = pid.clone();
- let endpoint_c = endpoint.clone();
- let cback = |conn: Conn<NetMsg>| async move {
- let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
-
- // If the entry is not in the routing table, ignore the result
- let pid = match pid_c {
- Some(p) => p,
- None => return Ok(()),
- };
-
- match result {
- Err(Error::IncompatiblePeer) => {
- error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
- }
- Err(Error::PeerAlreadyConnected) => {
- selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ let cback = {
+ let this = self.clone();
+ let endpoint = endpoint.clone();
+ let pid = pid.clone();
+ |conn: Conn<NetMsg>| async move {
+ let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint} incompatible peer");
+ this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ this.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ }
+ Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
- Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
- Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
- }
- Ok(())
+ Ok(())
+ }
};
let result = self