aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-29 21:16:46 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-29 21:16:46 +0200
commit5c0abab1b7bf0f2858c451d6f0efc7ca0e138fc6 (patch)
tree9d64a261ddd289560365b71f5d02d31df6c4a0ec /p2p
parentbcc6721257889f85f57af1b40351540585ffd41d (diff)
use shadown variables to name clones and place them between {} when spawning new tasks
Diffstat (limited to 'p2p')
-rw-r--r--p2p/examples/chat.rs42
-rw-r--r--p2p/src/connector.rs19
-rw-r--r--p2p/src/discovery/lookup.rs12
-rw-r--r--p2p/src/discovery/mod.rs70
-rw-r--r--p2p/src/discovery/refresh.rs45
-rw-r--r--p2p/src/listener.rs27
-rw-r--r--p2p/src/peer/mod.rs19
-rw-r--r--p2p/src/peer_pool.rs27
-rw-r--r--p2p/src/protocols/ping.rs6
9 files changed, 148 insertions, 119 deletions
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 4dd87c5..1ad215c 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -59,14 +59,16 @@ impl ChatProtocol {
#[async_trait]
impl Protocol for ChatProtocol {
async fn start(self: Arc<Self>) -> Result<(), Error> {
- let selfc = self.clone();
let stdin = io::stdin();
- let task = self.executor.spawn(async move {
- loop {
- let mut input = String::new();
- stdin.read_line(&mut input).await.unwrap();
- let msg = format!("> {}: {}", selfc.username, input.trim());
- selfc.peer.broadcast(&Self::id(), &msg).await;
+ let task = self.executor.spawn({
+ let this = self.clone();
+ async move {
+ loop {
+ let mut input = String::new();
+ stdin.read_line(&mut input).await.unwrap();
+ let msg = format!("> {}: {}", this.username, input.trim());
+ this.peer.broadcast(&Self::id(), &msg).await;
+ }
}
});
@@ -126,23 +128,25 @@ fn main() {
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let ex_cloned = ex.clone();
run_executor(
- async {
- let username = cli.username;
+ {
+ let ex = ex.clone();
+ async {
+ let username = cli.username;
- // Attach the ChatProtocol
- let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone().into());
- backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+ // Attach the ChatProtocol
+ let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into());
+ backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
- // Run the backend
- backend.run().await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
+ // Shutdown the backend
+ backend.shutdown().await;
+ }
},
ex,
);
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index a44daea..cfa661b 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -127,17 +127,18 @@ impl Connector {
{
let conn = self.connect(endpoint, peer_id).await?;
- let selfc = self.clone();
let endpoint = endpoint.clone();
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- trace!("Outbound connection dropped: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ trace!("Outbound connection dropped: {err}");
+ }
+ this.monitor
+ .notify(ConnEvent::Disconnected(endpoint.clone()))
+ .await;
+ this.connection_slots.remove().await;
}
- selfc
- .monitor
- .notify(ConnEvent::Disconnected(endpoint.clone()))
- .await;
- selfc.connection_slots.remove().await;
};
self.task_group.spawn(callback(conn), on_disconnect);
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 8e06eef..9ddf614 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -283,11 +283,13 @@ impl LookupService {
let endpoint = Endpoint::Tcp(addr, self.config.discovery_port);
- let selfc = self.clone();
- let callback = |conn: Conn<NetMsg>| async move {
- let t = Duration::from_secs(selfc.config.lookup_connection_lifespan);
- timeout(t, selfc.handle_inbound(conn)).await??;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |conn: Conn<NetMsg>| async move {
+ let t = Duration::from_secs(this.config.lookup_connection_lifespan);
+ timeout(t, this.handle_inbound(conn)).await??;
+ Ok(())
+ }
};
self.listener.start(endpoint.clone(), callback).await?;
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index dae4d3f..a9d99d6 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -154,13 +154,17 @@ impl Discovery {
}
// Start connect loop
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.connect_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.connect_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -177,10 +181,12 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
- let selfc = self.clone();
- let callback = |c: Conn<NetMsg>| async move {
- selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |c: Conn<NetMsg>| async move {
+ this.conn_queue.handle(c, ConnDirection::Inbound).await?;
+ Ok(())
+ }
};
let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
@@ -212,31 +218,33 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
- let selfc = self.clone();
- let pid_c = pid.clone();
- let endpoint_c = endpoint.clone();
- let cback = |conn: Conn<NetMsg>| async move {
- let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
-
- // If the entry is not in the routing table, ignore the result
- let pid = match pid_c {
- Some(p) => p,
- None => return Ok(()),
- };
-
- match result {
- Err(Error::IncompatiblePeer) => {
- error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
- }
- Err(Error::PeerAlreadyConnected) => {
- selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ let cback = {
+ let this = self.clone();
+ let endpoint = endpoint.clone();
+ let pid = pid.clone();
+ |conn: Conn<NetMsg>| async move {
+ let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint} incompatible peer");
+ this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ this.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ }
+ Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
- Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
- Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
- }
- Ok(())
+ Ok(())
+ }
};
let result = self
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index c1d222b..b4f5396 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -38,9 +38,6 @@ pub struct RefreshService {
/// Managing spawned tasks.
task_group: TaskGroup,
- /// A global executor
- executor: Executor,
-
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -65,7 +62,6 @@ impl RefreshService {
table,
listen_endpoint,
task_group: TaskGroup::with_executor(executor.clone()),
- executor,
config,
monitor,
}
@@ -74,24 +70,32 @@ impl RefreshService {
/// Start the refresh service
pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await;
+ let endpoint = endpoint.read().await.clone();
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(endpoint).await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- });
+ },
+ );
}
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.refresh_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.refresh_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -133,25 +137,22 @@ impl RefreshService {
}
}
- /// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt.
+ /// Iterates over the entries and initiates a connection.
async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
- let ex = &self.executor;
- // Enforce a maximum of 16 concurrent connections.
+ use futures_util::stream::{FuturesUnordered, StreamExt};
+ // Enforce a maximum of 16 connections.
for chunk in entries.chunks(16) {
- let mut tasks = Vec::new();
+ let mut tasks = FuturesUnordered::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
- tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))
+ tasks.push(self.clone().refresh_entry(bucket_entry.clone()))
}
- for task in tasks {
- let _ = task.await;
- }
+ while tasks.next().await.is_some() {}
}
}
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 923eb18..8a5deaa 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -87,9 +87,13 @@ impl Listener {
info!("Start listening on {resolved_endpoint}");
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(listener, callback), |_| async {});
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(listener, callback).await }
+ },
+ |_| async {},
+ );
Ok(resolved_endpoint)
}
@@ -135,16 +139,15 @@ impl Listener {
self.connection_slots.add();
- let selfc = self.clone();
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- debug!("Inbound connection dropped: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ debug!("Inbound connection dropped: {err}");
+ }
+ this.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
+ this.connection_slots.remove().await;
}
- selfc
- .monitor
- .notify(ConnEvent::Disconnected(endpoint))
- .await;
- selfc.connection_slots.remove().await;
};
let callback = callback.clone();
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 2068789..6903294 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -209,16 +209,17 @@ impl Peer {
self.protocol_ids.write().await.push(protocol_id.clone());
- let selfc = self.clone();
- let proto_idc = protocol_id.clone();
-
- let on_failure = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(res) = result {
- if res.is_err() {
- error!("protocol {} stopped", proto_idc);
+ let on_failure = {
+ let this = self.clone();
+ let protocol_id = protocol_id.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(res) = result {
+ if res.is_err() {
+ error!("protocol {} stopped", protocol_id);
+ }
+ // Send a stop signal to read loop
+ let _ = this.stop_chan.0.try_send(res);
}
- // Send a stop signal to read loop
- let _ = selfc.stop_chan.0.try_send(res);
}
};
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 02a74ac..1f3ca55 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -78,11 +78,16 @@ impl PeerPool {
})
}
- /// Start
+ /// Starts the [`PeerPool`]
pub async fn start(self: &Arc<Self>) -> Result<()> {
self.setup_protocols().await?;
- let selfc = self.clone();
- self.task_group.spawn(selfc.listen_loop(), |_| async {});
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop().await }
+ },
+ |_| async {},
+ );
Ok(())
}
@@ -145,14 +150,16 @@ impl PeerPool {
// Insert the new peer
self.peers.write().await.insert(pid.clone(), peer.clone());
- let selfc = self.clone();
- let pid_c = pid.clone();
- let on_disconnect = |result| async move {
- if let TaskResult::Completed(result) = result {
- if let Err(err) = selfc.remove_peer(&pid_c).await {
- error!("Failed to remove peer {pid_c}: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ let pid = pid.clone();
+ |result| async move {
+ if let TaskResult::Completed(result) = result {
+ if let Err(err) = this.remove_peer(&pid).await {
+ error!("Failed to remove peer {pid}: {err}");
+ }
+ let _ = disconnect_signal.send(result).await;
}
- let _ = disconnect_signal.send(result).await;
}
};
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index ef1b54e..b800b23 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -128,9 +128,11 @@ impl Protocol for PingProtocol {
let (pong_chan, pong_chan_recv) = async_channel::bounded(1);
let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1);
- let selfc = self.clone();
self.task_group.spawn(
- selfc.clone().ping_loop(pong_chan_recv.clone()),
+ {
+ let this = self.clone();
+ async move { this.ping_loop(pong_chan_recv.clone()).await }
+ },
|res| async move {
if let TaskResult::Completed(result) = res {
let _ = stop_signal_s.send(result).await;