aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs23
1 files changed, 8 insertions, 15 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 745a5d5..eec6743 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -5,10 +5,7 @@ use log::{error, info, trace};
use rand::{rngs::OsRng, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
@@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]);
pub struct RefreshService {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Resolved listen endpoint
listen_endpoint: Option<RwLock<Endpoint>>,
@@ -55,7 +52,7 @@ impl RefreshService {
/// Creates a new refresh service
pub fn new(
config: Arc<Config>,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
monitor: Arc<Monitor>,
executor: Executor,
) -> Self {
@@ -122,7 +119,7 @@ impl RefreshService {
self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
- for bucket in self.table.lock().await.iter() {
+ for bucket in self.table.buckets() {
for entry in bucket
.iter()
.filter(|e| !e.is_connected() && !e.is_incompatible())
@@ -145,10 +142,7 @@ impl RefreshService {
let mut tasks = Vec::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
- self.table
- .lock()
- .await
- .remove_entry(&bucket_entry.entry.key);
+ self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
@@ -167,16 +161,15 @@ impl RefreshService {
let key = &bucket_entry.entry.key;
match self.connect(&bucket_entry.entry).await {
Ok(_) => {
- self.table.lock().await.update_entry(key, PENDING_ENTRY);
+ self.table.update_entry(key, PENDING_ENTRY);
}
Err(err) => {
trace!("Failed to refresh entry {:?}: {err}", key);
- let table = &mut self.table.lock().await;
if bucket_entry.failures >= MAX_FAILURES {
- table.remove_entry(key);
+ self.table.remove_entry(key);
return;
}
- table.update_entry(key, UNREACHABLE_ENTRY);
+ self.table.update_entry(key, UNREACHABLE_ENTRY);
}
}
}