From 5c0abab1b7bf0f2858c451d6f0efc7ca0e138fc6 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 29 Jun 2024 21:16:46 +0200 Subject: use shadown variables to name clones and place them between {} when spawning new tasks --- p2p/examples/chat.rs | 42 ++++++++++++++------------ p2p/src/connector.rs | 19 ++++++------ p2p/src/discovery/lookup.rs | 12 ++++---- p2p/src/discovery/mod.rs | 70 ++++++++++++++++++++++++-------------------- p2p/src/discovery/refresh.rs | 45 ++++++++++++++-------------- p2p/src/listener.rs | 27 +++++++++-------- p2p/src/peer/mod.rs | 19 ++++++------ p2p/src/peer_pool.rs | 27 ++++++++++------- p2p/src/protocols/ping.rs | 6 ++-- 9 files changed, 148 insertions(+), 119 deletions(-) (limited to 'p2p') diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 4dd87c5..1ad215c 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -59,14 +59,16 @@ impl ChatProtocol { #[async_trait] impl Protocol for ChatProtocol { async fn start(self: Arc) -> Result<(), Error> { - let selfc = self.clone(); let stdin = io::stdin(); - let task = self.executor.spawn(async move { - loop { - let mut input = String::new(); - stdin.read_line(&mut input).await.unwrap(); - let msg = format!("> {}: {}", selfc.username, input.trim()); - selfc.peer.broadcast(&Self::id(), &msg).await; + let task = self.executor.spawn({ + let this = self.clone(); + async move { + loop { + let mut input = String::new(); + stdin.read_line(&mut input).await.unwrap(); + let msg = format!("> {}: {}", this.username, input.trim()); + this.peer.broadcast(&Self::id(), &msg).await; + } } }); @@ -126,23 +128,25 @@ fn main() { let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let ex_cloned = ex.clone(); run_executor( - async { - let username = cli.username; + { + let ex = ex.clone(); + async { + let username = cli.username; - // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone().into()); - backend.attach_protocol::(c).await.unwrap(); + // Attach the ChatProtocol + let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into()); + backend.attach_protocol::(c).await.unwrap(); - // Run the backend - backend.run().await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; + // Shutdown the backend + backend.shutdown().await; + } }, ex, ); diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index a44daea..cfa661b 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -127,17 +127,18 @@ impl Connector { { let conn = self.connect(endpoint, peer_id).await?; - let selfc = self.clone(); let endpoint = endpoint.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Outbound connection dropped: {err}"); + let on_disconnect = { + let this = self.clone(); + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + trace!("Outbound connection dropped: {err}"); + } + this.monitor + .notify(ConnEvent::Disconnected(endpoint.clone())) + .await; + this.connection_slots.remove().await; } - selfc - .monitor - .notify(ConnEvent::Disconnected(endpoint.clone())) - .await; - selfc.connection_slots.remove().await; }; self.task_group.spawn(callback(conn), on_disconnect); diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 8e06eef..9ddf614 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -283,11 +283,13 @@ impl LookupService { let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); - let selfc = self.clone(); - let callback = |conn: Conn| async move { - let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); - timeout(t, selfc.handle_inbound(conn)).await??; - Ok(()) + let callback = { + let this = self.clone(); + |conn: Conn| async move { + let t = Duration::from_secs(this.config.lookup_connection_lifespan); + timeout(t, this.handle_inbound(conn)).await??; + Ok(()) + } }; self.listener.start(endpoint.clone(), callback).await?; diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index dae4d3f..a9d99d6 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -154,13 +154,17 @@ impl Discovery { } // Start connect loop - let selfc = self.clone(); - self.task_group - .spawn(selfc.connect_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.connect_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -177,10 +181,12 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc, endpoint: &Endpoint) -> Result { - let selfc = self.clone(); - let callback = |c: Conn| async move { - selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; - Ok(()) + let callback = { + let this = self.clone(); + |c: Conn| async move { + this.conn_queue.handle(c, ConnDirection::Inbound).await?; + Ok(()) + } }; let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; @@ -212,31 +218,33 @@ impl Discovery { /// Connect to the given endpoint using the connector async fn connect(self: &Arc, endpoint: &Endpoint, pid: Option) { - let selfc = self.clone(); - let pid_c = pid.clone(); - let endpoint_c = endpoint.clone(); - let cback = |conn: Conn| async move { - let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - - // If the entry is not in the routing table, ignore the result - let pid = match pid_c { - Some(p) => p, - None => return Ok(()), - }; - - match result { - Err(Error::IncompatiblePeer) => { - error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); - } - Err(Error::PeerAlreadyConnected) => { - selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) + let cback = { + let this = self.clone(); + let endpoint = endpoint.clone(); + let pid = pid.clone(); + |conn: Conn| async move { + let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await; + + // If the entry is not in the routing table, ignore the result + let pid = match pid { + Some(p) => p, + None => return Ok(()), + }; + + match result { + Err(Error::IncompatiblePeer) => { + error!("Failed to do handshake: {endpoint} incompatible peer"); + this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); + } + Err(Error::PeerAlreadyConnected) => { + this.table.update_entry(&pid.0, CONNECTED_ENTRY) + } + Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } - Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), - Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), - } - Ok(()) + Ok(()) + } }; let result = self diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index c1d222b..b4f5396 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -38,9 +38,6 @@ pub struct RefreshService { /// Managing spawned tasks. task_group: TaskGroup, - /// A global executor - executor: Executor, - /// Holds the configuration for the P2P network. config: Arc, @@ -65,7 +62,6 @@ impl RefreshService { table, listen_endpoint, task_group: TaskGroup::with_executor(executor.clone()), - executor, config, monitor, } @@ -74,24 +70,32 @@ impl RefreshService { /// Start the refresh service pub async fn start(self: &Arc) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await; + let endpoint = endpoint.read().await.clone(); - let selfc = self.clone(); - self.task_group - .spawn(selfc.listen_loop(endpoint.clone()), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop(endpoint).await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }); + }, + ); } - let selfc = self.clone(); - self.task_group - .spawn(selfc.refresh_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.refresh_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -133,25 +137,22 @@ impl RefreshService { } } - /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt. + /// Iterates over the entries and initiates a connection. async fn do_refresh(self: Arc, entries: &[BucketEntry]) { - let ex = &self.executor; - // Enforce a maximum of 16 concurrent connections. + use futures_util::stream::{FuturesUnordered, StreamExt}; + // Enforce a maximum of 16 connections. for chunk in entries.chunks(16) { - let mut tasks = Vec::new(); + let mut tasks = FuturesUnordered::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { self.table.remove_entry(&bucket_entry.entry.key); continue; } - tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) + tasks.push(self.clone().refresh_entry(bucket_entry.clone())) } - for task in tasks { - let _ = task.await; - } + while tasks.next().await.is_some() {} } } diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 923eb18..8a5deaa 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -87,9 +87,13 @@ impl Listener { info!("Start listening on {resolved_endpoint}"); - let selfc = self.clone(); - self.task_group - .spawn(selfc.listen_loop(listener, callback), |_| async {}); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop(listener, callback).await } + }, + |_| async {}, + ); Ok(resolved_endpoint) } @@ -135,16 +139,15 @@ impl Listener { self.connection_slots.add(); - let selfc = self.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - debug!("Inbound connection dropped: {err}"); + let on_disconnect = { + let this = self.clone(); + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + debug!("Inbound connection dropped: {err}"); + } + this.monitor.notify(ConnEvent::Disconnected(endpoint)).await; + this.connection_slots.remove().await; } - selfc - .monitor - .notify(ConnEvent::Disconnected(endpoint)) - .await; - selfc.connection_slots.remove().await; }; let callback = callback.clone(); diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 2068789..6903294 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -209,16 +209,17 @@ impl Peer { self.protocol_ids.write().await.push(protocol_id.clone()); - let selfc = self.clone(); - let proto_idc = protocol_id.clone(); - - let on_failure = |result: TaskResult>| async move { - if let TaskResult::Completed(res) = result { - if res.is_err() { - error!("protocol {} stopped", proto_idc); + let on_failure = { + let this = self.clone(); + let protocol_id = protocol_id.clone(); + |result: TaskResult>| async move { + if let TaskResult::Completed(res) = result { + if res.is_err() { + error!("protocol {} stopped", protocol_id); + } + // Send a stop signal to read loop + let _ = this.stop_chan.0.try_send(res); } - // Send a stop signal to read loop - let _ = selfc.stop_chan.0.try_send(res); } }; diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 02a74ac..1f3ca55 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -78,11 +78,16 @@ impl PeerPool { }) } - /// Start + /// Starts the [`PeerPool`] pub async fn start(self: &Arc) -> Result<()> { self.setup_protocols().await?; - let selfc = self.clone(); - self.task_group.spawn(selfc.listen_loop(), |_| async {}); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop().await } + }, + |_| async {}, + ); Ok(()) } @@ -145,14 +150,16 @@ impl PeerPool { // Insert the new peer self.peers.write().await.insert(pid.clone(), peer.clone()); - let selfc = self.clone(); - let pid_c = pid.clone(); - let on_disconnect = |result| async move { - if let TaskResult::Completed(result) = result { - if let Err(err) = selfc.remove_peer(&pid_c).await { - error!("Failed to remove peer {pid_c}: {err}"); + let on_disconnect = { + let this = self.clone(); + let pid = pid.clone(); + |result| async move { + if let TaskResult::Completed(result) = result { + if let Err(err) = this.remove_peer(&pid).await { + error!("Failed to remove peer {pid}: {err}"); + } + let _ = disconnect_signal.send(result).await; } - let _ = disconnect_signal.send(result).await; } }; diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index ef1b54e..b800b23 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -128,9 +128,11 @@ impl Protocol for PingProtocol { let (pong_chan, pong_chan_recv) = async_channel::bounded(1); let (stop_signal_s, stop_signal) = async_channel::bounded::>(1); - let selfc = self.clone(); self.task_group.spawn( - selfc.clone().ping_loop(pong_chan_recv.clone()), + { + let this = self.clone(); + async move { this.ping_loop(pong_chan_recv.clone()).await } + }, |res| async move { if let TaskResult::Completed(result) = res { let _ = stop_signal_s.send(result).await; -- cgit v1.2.3