aboutsummaryrefslogtreecommitdiff
path: root/p2p/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-27 02:39:31 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-27 02:39:31 +0200
commitb8b5f00e9695f46ea30af3ce63aec6dd17f356ae (patch)
tree3f1b07539c248f9536f5c7b6e3870e235d4f49d7 /p2p/src
parent1a3ef2d77ab54bfe286f7400ac0cee2e25ea14e3 (diff)
Improve async channels error handling and replace unbounded channels with bounded channels
Remove all unbounded channels to prevent unbounded memory usage and potential crashes. Use `FuturesUnordered` for sending to multiple channels simultaneously. This prevents the sending loop from blocking if one channel is blocked, and helps handle errors properly.
Diffstat (limited to 'p2p/src')
-rw-r--r--p2p/src/discovery/lookup.rs23
1 files changed, 17 insertions, 6 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index a941986..8e06eef 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};
-use futures_util::{stream::FuturesUnordered, StreamExt};
+use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
@@ -146,7 +146,12 @@ impl LookupService {
};
let mut peer_buffer = vec![];
- self.self_lookup(&random_peers, &mut peer_buffer).await;
+ if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await {
+ self.monitor
+ .notify(DiscvEvent::LookupFailed(endpoint.clone()))
+ .await;
+ return Err(err);
+ }
while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
match random_peers.pop() {
@@ -201,14 +206,18 @@ impl LookupService {
}
/// Starts a self lookup
- async fn self_lookup(&self, random_peers: &Vec<PeerMsg>, peer_buffer: &mut Vec<PeerMsg>) {
- let mut tasks = FuturesUnordered::new();
+ async fn self_lookup(
+ &self,
+ random_peers: &Vec<PeerMsg>,
+ peer_buffer: &mut Vec<PeerMsg>,
+ ) -> Result<()> {
+ let mut results = FuturesUnordered::new();
for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
- tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
+ results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
}
- while let Some(result) = tasks.next().await {
+ while let Some(result) = results.next().await {
match result {
Ok(peers) => peer_buffer.extend(peers),
Err(err) => {
@@ -216,6 +225,8 @@ impl LookupService {
}
}
}
+
+ Ok(())
}
/// Connects to the given endpoint and initiates a lookup process for the