diff options
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r-- | p2p/src/discovery/mod.rs | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index dae4d3f..a9d99d6 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -154,13 +154,17 @@ impl Discovery { } // Start connect loop - let selfc = self.clone(); - self.task_group - .spawn(selfc.connect_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.connect_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -177,10 +181,12 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { - let selfc = self.clone(); - let callback = |c: Conn<NetMsg>| async move { - selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; - Ok(()) + let callback = { + let this = self.clone(); + |c: Conn<NetMsg>| async move { + this.conn_queue.handle(c, ConnDirection::Inbound).await?; + Ok(()) + } }; let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; @@ -212,31 +218,33 @@ impl Discovery { /// Connect to the given endpoint using the connector async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { - let selfc = self.clone(); - let pid_c = pid.clone(); - let endpoint_c = endpoint.clone(); - let cback = |conn: Conn<NetMsg>| async move { - let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - - // If the entry is not in the routing table, ignore the result - let pid = match pid_c { - Some(p) => p, - None => return Ok(()), - }; - - match result { - Err(Error::IncompatiblePeer) => { - error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); - } - Err(Error::PeerAlreadyConnected) => { - selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) + let cback = { + let this = self.clone(); + let endpoint = endpoint.clone(); + let pid = pid.clone(); + |conn: Conn<NetMsg>| async move { + let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await; + + // If the entry is not in the routing table, ignore the result + let pid = match pid { + Some(p) => p, + None => return Ok(()), + }; + + match result { + Err(Error::IncompatiblePeer) => { + error!("Failed to do handshake: {endpoint} incompatible peer"); + this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); + } + Err(Error::PeerAlreadyConnected) => { + this.table.update_entry(&pid.0, CONNECTED_ENTRY) + } + Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } - Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), - Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), - } - Ok(()) + Ok(()) + } }; let result = self |