aboutsummaryrefslogtreecommitdiff
path: root/p2p/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
commit998568ab76cc8ba36fe47d5fca17bcc997aa391c (patch)
treeeb6e1239498f8d7179eb2e3e0b74c6396bcea51b /p2p/src
parentc9785e8cc5b6a9a722ba0aff1eb33c2dbf020f2e (diff)
p2p: wrap the buckets with mutex in RoutingTable
Diffstat (limited to 'p2p/src')
-rw-r--r--p2p/src/discovery/lookup.rs28
-rw-r--r--p2p/src/discovery/mod.rs53
-rw-r--r--p2p/src/discovery/refresh.rs23
-rw-r--r--p2p/src/routing_table/mod.rs69
4 files changed, 71 insertions, 102 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 4a06083..3beea7e 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,10 +5,7 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::timeout,
crypto::KeyPair,
util::decode,
@@ -38,7 +35,7 @@ pub struct LookupService {
id: PeerID,
/// Routing Table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Listener
listener: Arc<Listener>,
@@ -63,7 +60,7 @@ impl LookupService {
pub fn new(
key_pair: &KeyPair,
id: &PeerID,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
ex: Executor,
@@ -158,12 +155,10 @@ impl LookupService {
}
}
- let mut table = self.table.lock().await;
for peer in peer_buffer.iter() {
- let result = table.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry {:?}", result);
}
- drop(table);
self.monitor
.notify(DiscoveryEvent::LookupSucceeded(
@@ -190,11 +185,10 @@ impl LookupService {
.connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
.await?;
- let table = self.table.lock().await;
for peer in peers {
if random_peers.contains(&peer)
|| peer.peer_id == self.id
- || table.contains_key(&peer.peer_id.0)
+ || self.table.contains_key(&peer.peer_id.0)
{
continue;
}
@@ -233,10 +227,6 @@ impl LookupService {
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
- self.monitor
- .notify(ConnEvent::Connected(endpoint.clone()))
- .await;
-
let result = self.handle_outbound(conn, target_peer_id).await;
self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
@@ -318,7 +308,7 @@ impl LookupService {
}
NetMsgCmd::Peer => {
let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
- let result = self.table.lock().await.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry result: {:?}", result);
}
c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
@@ -381,9 +371,9 @@ impl LookupService {
/// Sends a Peers msg.
async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Peers msg");
- let table = self.table.lock().await;
- let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
- drop(table);
+ let entries = self
+ .table
+ .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?)
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 19ae77a..64e5c14 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -7,7 +7,7 @@ use log::{error, info};
use rand::{rngs::OsRng, seq::SliceRandom};
use karyon_core::{
- async_runtime::{lock::Mutex, Executor},
+ async_runtime::Executor,
async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
@@ -22,8 +22,8 @@ use crate::{
message::NetMsg,
monitor::Monitor,
routing_table::{
- Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
- INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+ RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY,
+ UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
},
slots::ConnectionSlots,
Error, PeerID, Result,
@@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc<Discovery>;
pub struct Discovery {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Lookup Service
lookup_service: Arc<LookupService>,
@@ -80,7 +80,7 @@ impl Discovery {
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
let table_key = peer_id.0;
- let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
+ let table = Arc::new(RoutingTable::new(table_key));
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
@@ -201,8 +201,7 @@ impl Discovery {
async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
- let random_table_entry = self.random_table_entry(PENDING_ENTRY).await;
- match random_table_entry {
+ match self.table.random_entry(PENDING_ENTRY) {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
@@ -233,18 +232,13 @@ impl Discovery {
match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
}
Err(Error::PeerAlreadyConnected) => {
- // TODO: Use an appropriate status.
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
- }
- Err(_) => {
- selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await;
- }
- Ok(_) => {
- selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
+ selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
}
+ Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
Ok(())
@@ -257,12 +251,8 @@ impl Discovery {
if let Some(pid) = &pid {
match result {
- Ok(_) => {
- self.update_table_entry(pid, CONNECTED_ENTRY).await;
- }
- Err(_) => {
- self.update_table_entry(pid, UNREACHABLE_ENTRY).await;
- }
+ Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY),
+ Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),
}
}
}
@@ -274,16 +264,12 @@ impl Discovery {
/// table doesn't have an available entry, it will connect to one of the
/// provided bootstrap endpoints in the `Config` and initiate the lookup.
async fn start_seeding(&self) {
- match self
- .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY)
- .await
- {
+ match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
let peer_id = Some(entry.key.into());
if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
- self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY)
- .await;
+ self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
error!("Failed to do lookup: {endpoint}: {err}");
}
}
@@ -297,15 +283,4 @@ impl Discovery {
}
}
}
-
- /// Returns a random entry from routing table.
- async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
- self.table.lock().await.random_entry(entry_flag).cloned()
- }
-
- /// Update the entry status
- async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
- let table = &mut self.table.lock().await;
- table.update_entry(&pid.0, entry_flag);
- }
}
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 745a5d5..eec6743 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -5,10 +5,7 @@ use log::{error, info, trace};
use rand::{rngs::OsRng, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
@@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]);
pub struct RefreshService {
/// Routing table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Resolved listen endpoint
listen_endpoint: Option<RwLock<Endpoint>>,
@@ -55,7 +52,7 @@ impl RefreshService {
/// Creates a new refresh service
pub fn new(
config: Arc<Config>,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
monitor: Arc<Monitor>,
executor: Executor,
) -> Self {
@@ -122,7 +119,7 @@ impl RefreshService {
self.monitor.notify(DiscoveryEvent::RefreshStarted).await;
let mut entries: Vec<BucketEntry> = vec![];
- for bucket in self.table.lock().await.iter() {
+ for bucket in self.table.buckets() {
for entry in bucket
.iter()
.filter(|e| !e.is_connected() && !e.is_incompatible())
@@ -145,10 +142,7 @@ impl RefreshService {
let mut tasks = Vec::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
- self.table
- .lock()
- .await
- .remove_entry(&bucket_entry.entry.key);
+ self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
@@ -167,16 +161,15 @@ impl RefreshService {
let key = &bucket_entry.entry.key;
match self.connect(&bucket_entry.entry).await {
Ok(_) => {
- self.table.lock().await.update_entry(key, PENDING_ENTRY);
+ self.table.update_entry(key, PENDING_ENTRY);
}
Err(err) => {
trace!("Failed to refresh entry {:?}: {err}", key);
- let table = &mut self.table.lock().await;
if bucket_entry.failures >= MAX_FAILURES {
- table.remove_entry(key);
+ self.table.remove_entry(key);
return;
}
- table.update_entry(key, UNREACHABLE_ENTRY);
+ self.table.update_entry(key, UNREACHABLE_ENTRY);
}
}
}
diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs
index bbf4801..49ddac5 100644
--- a/p2p/src/routing_table/mod.rs
+++ b/p2p/src/routing_table/mod.rs
@@ -1,18 +1,20 @@
-use std::net::IpAddr;
-
mod bucket;
mod entry;
+use std::net::IpAddr;
+
+use parking_lot::RwLock;
+
+use rand::{rngs::OsRng, seq::SliceRandom};
+
+use karyon_net::Addr;
+
pub use bucket::{
Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
};
pub use entry::{xor_distance, Entry, Key};
-use rand::{rngs::OsRng, seq::SliceRandom};
-
-use karyon_net::Addr;
-
use bucket::BUCKET_SIZE;
use entry::KEY_SIZE;
@@ -46,29 +48,30 @@ pub enum AddEntryResult {
#[derive(Debug)]
pub struct RoutingTable {
key: Key,
- buckets: Vec<Bucket>,
+ buckets: RwLock<Vec<Bucket>>,
}
impl RoutingTable {
/// Creates a new RoutingTable
pub fn new(key: Key) -> Self {
let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
- Self { key, buckets }
+ Self {
+ key,
+ buckets: RwLock::new(buckets),
+ }
}
/// Adds a new entry to the table and returns a result indicating success,
/// failure, or restrictions.
- pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult {
+ pub fn add_entry(&self, entry: Entry) -> AddEntryResult {
// Determine the index of the bucket where the entry should be placed.
let bucket_idx = match self.bucket_index(&entry.key) {
Some(i) => i,
None => return AddEntryResult::Ignored,
};
- let bucket = &self.buckets[bucket_idx];
-
// Check if the entry already exists in the bucket.
- if bucket.contains_key(&entry.key) {
+ if self.contains_key(&entry.key) {
return AddEntryResult::Exists;
}
@@ -77,7 +80,8 @@ impl RoutingTable {
return AddEntryResult::Restricted;
}
- let bucket = &mut self.buckets[bucket_idx];
+ let mut buckets = self.buckets.write();
+ let bucket = &mut buckets[bucket_idx];
// If the bucket has free space, add the entry and return success.
if bucket.len() < BUCKET_SIZE {
@@ -99,13 +103,14 @@ impl RoutingTable {
/// Check if the table contains the given key.
pub fn contains_key(&self, key: &Key) -> bool {
+ let buckets = self.buckets.read();
// Determine the bucket index for the given key.
let bucket_idx = match self.bucket_index(key) {
Some(bi) => bi,
None => return false,
};
- let bucket = &self.buckets[bucket_idx];
+ let bucket = &buckets[bucket_idx];
bucket.contains_key(key)
}
@@ -113,14 +118,15 @@ impl RoutingTable {
/// by the given key.
///
/// If the key is not found, no action is taken.
- pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
+ pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) {
+ let mut buckets = self.buckets.write();
// Determine the bucket index for the given key.
let bucket_idx = match self.bucket_index(key) {
Some(bi) => bi,
None => return,
};
- let bucket = &mut self.buckets[bucket_idx];
+ let bucket = &mut buckets[bucket_idx];
bucket.update_entry(key, entry_flag);
}
@@ -149,11 +155,12 @@ impl RoutingTable {
/// Returns a list of the closest entries to the given target key, limited by max_entries.
pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
+ let buckets = self.buckets.read();
let mut entries: Vec<Entry> = vec![];
// Collect entries
'outer: for idx in self.bucket_indexes(target_key) {
- let bucket = &self.buckets[idx];
+ let bucket = &buckets[idx];
for bucket_entry in bucket.iter() {
if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
continue;
@@ -175,30 +182,33 @@ impl RoutingTable {
}
/// Removes an entry with the given key from the routing table, if it exists.
- pub fn remove_entry(&mut self, key: &Key) {
+ pub fn remove_entry(&self, key: &Key) {
+ let mut buckets = self.buckets.write();
// Determine the bucket index for the given key.
let bucket_idx = match self.bucket_index(key) {
Some(bi) => bi,
None => return,
};
- let bucket = &mut self.buckets[bucket_idx];
+ let bucket = &mut buckets[bucket_idx];
bucket.remove(key);
}
/// Returns an iterator of entries.
- pub fn iter(&self) -> impl Iterator<Item = &Bucket> {
- self.buckets.iter()
+ /// FIXME: TODO: avoid cloning the data
+ pub fn buckets(&self) -> Vec<Bucket> {
+ self.buckets.read().clone()
}
/// Returns a random entry from the routing table.
- pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> {
- for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) {
+ pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
+ let buckets = self.buckets.read();
+ for bucket in buckets.choose_multiple(&mut OsRng, buckets.len()) {
for entry in bucket.random_iter(bucket.len()) {
if entry.status & entry_flag == 0 {
continue;
}
- return Some(&entry.entry);
+ return Some(entry.entry.clone());
}
}
@@ -230,12 +240,13 @@ impl RoutingTable {
/// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry
/// is considered restricted and returns true.
fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool {
+ let buckets = self.buckets.read();
let mut bucket_count = 0;
let mut table_count = 0;
// Iterate through the routing table's buckets and entries to check
// for subnet matches.
- for (i, bucket) in self.buckets.iter().enumerate() {
+ for (i, bucket) in buckets.iter().enumerate() {
for e in bucket.iter() {
// If there is a subnet match, update the counts.
let matched = subnet_match(&e.entry.addr, &entry.addr);
@@ -366,7 +377,7 @@ mod tests {
}
fn table(&self) -> RoutingTable {
- let mut table = RoutingTable::new(self.local_key.clone());
+ let table = RoutingTable::new(self.local_key.clone());
for entry in self.entries() {
let res = table.add_entry(entry);
@@ -422,11 +433,11 @@ mod tests {
#[test]
fn test_random_entry() {
let setup = Setup::new();
- let mut table = setup.table();
+ let table = setup.table();
let entries = setup.entries();
let entry = table.random_entry(ALL_ENTRY);
- assert!(matches!(entry, Some(&_)));
+ assert!(matches!(entry, Some(_)));
let entry = table.random_entry(CONNECTED_ENTRY);
assert!(matches!(entry, None));
@@ -442,7 +453,7 @@ mod tests {
#[test]
fn test_add_entries() {
let setup = Setup::new();
- let mut table = setup.table();
+ let table = setup.table();
let key = [
0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,