diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
commit | b8b5f00e9695f46ea30af3ce63aec6dd17f356ae (patch) | |
tree | 3f1b07539c248f9536f5c7b6e3870e235d4f49d7 /p2p | |
parent | 1a3ef2d77ab54bfe286f7400ac0cee2e25ea14e3 (diff) |
Improve async channels error handling and replace unbounded channels with bounded channels
Remove all unbounded channels to prevent unbounded memory usage and
potential crashes.
Use `FuturesUnordered` for sending to multiple channels simultaneously.
This prevents the sending loop from blocking if one channel is blocked,
and helps handle errors properly.
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/src/discovery/lookup.rs | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index a941986..8e06eef 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use futures_util::{stream::FuturesUnordered, StreamExt}; +use futures_util::stream::{FuturesUnordered, StreamExt}; use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; @@ -146,7 +146,12 @@ impl LookupService { }; let mut peer_buffer = vec![]; - self.self_lookup(&random_peers, &mut peer_buffer).await; + if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await { + self.monitor + .notify(DiscvEvent::LookupFailed(endpoint.clone())) + .await; + return Err(err); + } while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG { match random_peers.pop() { @@ -201,14 +206,18 @@ impl LookupService { } /// Starts a self lookup - async fn self_lookup(&self, random_peers: &Vec<PeerMsg>, peer_buffer: &mut Vec<PeerMsg>) { - let mut tasks = FuturesUnordered::new(); + async fn self_lookup( + &self, + random_peers: &Vec<PeerMsg>, + peer_buffer: &mut Vec<PeerMsg>, + ) -> Result<()> { + let mut results = FuturesUnordered::new(); for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); - tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id)) + results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id)) } - while let Some(result) = tasks.next().await { + while let Some(result) = results.next().await { match result { Ok(peers) => peer_buffer.extend(peers), Err(err) => { @@ -216,6 +225,8 @@ impl LookupService { } } } + + Ok(()) } /// Connects to the given endpoint and initiates a lookup process for the |