aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
commit998568ab76cc8ba36fe47d5fca17bcc997aa391c (patch)
treeeb6e1239498f8d7179eb2e3e0b74c6396bcea51b /p2p/src/discovery
parentc9785e8cc5b6a9a722ba0aff1eb33c2dbf020f2e (diff)
p2p: wrap the buckets with mutex in RoutingTable
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/lookup.rs28
-rw-r--r--p2p/src/discovery/mod.rs53
-rw-r--r--p2p/src/discovery/refresh.rs23
3 files changed, 31 insertions, 73 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 4a06083..3beea7e 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,10 +5,7 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::timeout,
crypto::KeyPair,
util::decode,
@@ -38,7 +35,7 @@ pub struct LookupService {
id: PeerID,
/// Routing Table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Listener
listener: Arc<Listener>,
@@ -63,7 +60,7 @@ impl LookupService {
pub fn new(
key_pair: &KeyPair,
id: &PeerID,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
ex: Executor,
@@ -158,12 +155,10 @@ impl LookupService {
}
}
- let mut table = self.table.lock().await;
for peer in peer_buffer.iter() {
- let result = table.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry {:?}", result);
}
- drop(table);
self.monitor
.notify(DiscoveryEvent::LookupSucceeded(
@@ -190,11 +185,10 @@ impl LookupService {
.connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
.await?;
- let table = self.table.lock().await;
for peer in peers {
if random_peers.contains(&peer)
|| peer.peer_id == self.id
- || table.contains_key(&peer.peer_id.0)
+ || self.table.contains_key(&peer.peer_id.0)
{
continue;
}
@@ -233,10 +227,6 @@ impl LookupService {
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
- self.monitor
- .notify(ConnEvent::Connected(endpoint.clone()))
- .await;
-
let result = self.handle_outbound(conn, target_peer_id).await;
self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
@@ -318,7 +308,7 @@ impl LookupService {
}
NetMsgCmd::Peer => {
let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
- let result = self.table.lock().await.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry result: {:?}", result);
}
c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
@@ -381,9 +371,9 @@ impl LookupService {
/// Sends a Peers msg.
async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Peers msg");
- let table = self.table.lock().await;
- let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
- drop(table);
+ let entries = self
+ .table
+ .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?)
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 19ae77a..64e5c14 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -7,7 +7,7 @@ use log::{error, info};
use rand::{rngs::OsRng, seq::SliceRandom};
use karyon_core::{
- async_runtime::{lock::Mutex, Executor},
+ async_runtime::Executor,
async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
@@ -22,8 +22,8 @@ use crate::{
message::NetMsg,
monitor::Monitor,
routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
- INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY,
+ UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
@@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc<Discovery>;
pub struct Discovery {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Lookup Service
lookup_service: Arc<LookupService>,
@@ -80,7 +80,7 @@ impl Discovery {
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
let table_key = peer_id.0;
- let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
+ let table = Arc::new(RoutingTable::new(table_key));
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
@@ -201,8 +201,7 @@ impl Discovery {
async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
- let random_table_entry = self.random_table_entry(PENDING_ENTRY).await;
- match random_table_entry {
+ match self.table.random_entry(PENDING_ENTRY) {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
@@ -233,18 +232,13 @@ impl Discovery {
match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
}
Err(Error::PeerAlreadyConnected) => {
- // TODO: Use an appropriate status.
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
- }
- Err(_) => {
- selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await;
- }
- Ok(_) => {
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
+ selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
}
+ Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
Ok(())
@@ -257,12 +251,8 @@ impl Discovery {
if let Some(pid) = &pid {
match result {
- Ok(_) => {
- self.update_table_entry(pid, CONNECTED_ENTRY).await;
- }
- Err(_) => {
- self.update_table_entry(pid, UNREACHABLE_ENTRY).await;
- }
+ Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY),
+ Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),
}
}
}
@@ -274,16 +264,12 @@ impl Discovery {
/// table doesn't have an available entry, it will connect to one of the
/// provided bootstrap endpoints in the `Config` and initiate the lookup.
async fn start_seeding(&self) {
- match self
- .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY)
- .await
- {
+ match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
let peer_id = Some(entry.key.into());
if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
- self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY)
- .await;
+ self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
error!("Failed to do lookup: {endpoint}: {err}");
}
}
@@ -297,15 +283,4 @@ impl Discovery {
}
}
}
-
- /// Returns a random entry from routing table.
- async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
- self.table.lock().await.random_entry(entry_flag).cloned()
- }
-
- /// Update the entry status
- async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
- let table = &mut self.table.lock().await;
- table.update_entry(&pid.0, entry_flag);
- }
}
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 745a5d5..eec6743 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -5,10 +5,7 @@ use log::{error, info, trace};
use rand::{rngs::OsRng, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
@@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]);
pub struct RefreshService {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Resolved listen endpoint
listen_endpoint: Option<RwLock<Endpoint>>,
@@ -55,7 +52,7 @@ impl RefreshService {
/// Creates a new refresh service
pub fn new(
config: Arc<Config>,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
monitor: Arc<Monitor>,
executor: Executor,
) -> Self {
@@ -122,7 +119,7 @@ impl RefreshService {
self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
- for bucket in self.table.lock().await.iter() {
+ for bucket in self.table.buckets() {
for entry in bucket
.iter()
.filter(|e| !e.is_connected() && !e.is_incompatible())
@@ -145,10 +142,7 @@ impl RefreshService {
let mut tasks = Vec::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
- self.table
- .lock()
- .await
- .remove_entry(&bucket_entry.entry.key);
+ self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
@@ -167,16 +161,15 @@ impl RefreshService {
let key = &bucket_entry.entry.key;
match self.connect(&bucket_entry.entry).await {
Ok(_) => {
- self.table.lock().await.update_entry(key, PENDING_ENTRY);
+ self.table.update_entry(key, PENDING_ENTRY);
}
Err(err) => {
trace!("Failed to refresh entry {:?}: {err}", key);
- let table = &mut self.table.lock().await;
if bucket_entry.failures >= MAX_FAILURES {
- table.remove_entry(key);
+ self.table.remove_entry(key);
return;
}
- table.update_entry(key, UNREACHABLE_ENTRY);
+ self.table.update_entry(key, UNREACHABLE_ENTRY);
}
}
}