From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- Cargo.lock | 2 ++ p2p/Cargo.toml | 1 + p2p/examples/monitor.rs | 2 +- p2p/src/discovery/lookup.rs | 28 ++++++------------ p2p/src/discovery/mod.rs | 53 +++++++++------------------------- p2p/src/discovery/refresh.rs | 23 +++++---------- p2p/src/routing_table/mod.rs | 69 +++++++++++++++++++++++++------------------- 7 files changed, 75 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81f0311..8659a33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1241,6 +1241,7 @@ dependencies = [ "ed25519-dalek", "log", "once_cell", + "parking_lot", "pin-project-lite", "rand", "smol", @@ -1322,6 +1323,7 @@ dependencies = [ "karyon_jsonrpc", "karyon_net", "log", + "parking_lot", "rand", "rcgen 0.12.1", "rustls-pki-types", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 22236e3..fe46953 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -48,6 +48,7 @@ futures-rustls = { version = "0.25.1", features = [ ], optional = true } tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true } rustls-pki-types = "1.7.0" +parking_lot = "0.12.2" [dev-dependencies] async-std = "1.12.0" diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 1629207..cda8972 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,6 +1,6 @@ mod shared; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use clap::Parser; use log::error; diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 4a06083..3beea7e 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,10 +5,7 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::timeout, crypto::KeyPair, util::decode, @@ -38,7 +35,7 @@ pub struct LookupService { id: PeerID, /// Routing Table - table: Arc>, + table: Arc, /// Listener listener: Arc, @@ -63,7 +60,7 @@ impl LookupService { pub fn new( key_pair: &KeyPair, id: &PeerID, - table: Arc>, + table: Arc, config: Arc, monitor: Arc, ex: Executor, @@ -158,12 +155,10 @@ impl LookupService { } } - let mut table = self.table.lock().await; for peer in peer_buffer.iter() { - let result = table.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry {:?}", result); } - drop(table); self.monitor .notify(DiscoveryEvent::LookupSucceeded( @@ -190,11 +185,10 @@ impl LookupService { .connect(endpoint.clone(), peer_id.clone(), &random_peer_id) .await?; - let table = self.table.lock().await; for peer in peers { if random_peers.contains(&peer) || peer.peer_id == self.id - || table.contains_key(&peer.peer_id.0) + || self.table.contains_key(&peer.peer_id.0) { continue; } @@ -233,10 +227,6 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result> { let conn = self.connector.connect(&endpoint, &peer_id).await?; - self.monitor - .notify(ConnEvent::Connected(endpoint.clone())) - .await; - let result = self.handle_outbound(conn, target_peer_id).await; self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; @@ -318,7 +308,7 @@ impl LookupService { } NetMsgCmd::Peer => { let (peer, _) = decode::(&msg.payload)?; - let result = self.table.lock().await.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry result: {:?}", result); } c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), @@ -381,9 +371,9 @@ impl LookupService { /// Sends a Peers msg. async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn) -> Result<()> { trace!("Send Peers msg"); - let table = self.table.lock().await; - let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); - drop(table); + let entries = self + .table + .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); let peers: Vec = entries.into_iter().map(|e| e.into()).collect(); conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 19ae77a..64e5c14 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -7,7 +7,7 @@ use log::{error, info}; use rand::{rngs::OsRng, seq::SliceRandom}; use karyon_core::{ - async_runtime::{lock::Mutex, Executor}, + async_runtime::Executor, async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; @@ -22,8 +22,8 @@ use crate::{ message::NetMsg, monitor::Monitor, routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, - INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, + RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, + UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }, slots::ConnectionSlots, Error, PeerID, Result, @@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc; pub struct Discovery { /// Routing table - table: Arc>, + table: Arc, /// Lookup Service lookup_service: Arc, @@ -80,7 +80,7 @@ impl Discovery { let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); let table_key = peer_id.0; - let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); + let table = Arc::new(RoutingTable::new(table_key)); let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); @@ -201,8 +201,7 @@ impl Discovery { async fn connect_loop(self: Arc) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { - let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; - match random_table_entry { + match self.table.random_entry(PENDING_ENTRY) { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -233,18 +232,13 @@ impl Discovery { match result { Err(Error::IncompatiblePeer) => { error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; + selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); } Err(Error::PeerAlreadyConnected) => { - // TODO: Use an appropriate status. - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; - } - Err(_) => { - selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; - } - Ok(_) => { - selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; + selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) } + Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } Ok(()) @@ -257,12 +251,8 @@ impl Discovery { if let Some(pid) = &pid { match result { - Ok(_) => { - self.update_table_entry(pid, CONNECTED_ENTRY).await; - } - Err(_) => { - self.update_table_entry(pid, UNREACHABLE_ENTRY).await; - } + Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY), + Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY), } } } @@ -274,16 +264,12 @@ impl Discovery { /// table doesn't have an available entry, it will connect to one of the /// provided bootstrap endpoints in the `Config` and initiate the lookup. async fn start_seeding(&self) { - match self - .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) - .await - { + match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); let peer_id = Some(entry.key.into()); if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { - self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) - .await; + self.table.update_entry(&entry.key, UNSTABLE_ENTRY); error!("Failed to do lookup: {endpoint}: {err}"); } } @@ -297,15 +283,4 @@ impl Discovery { } } } - - /// Returns a random entry from routing table. - async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option { - self.table.lock().await.random_entry(entry_flag).cloned() - } - - /// Update the entry status - async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { - let table = &mut self.table.lock().await; - table.update_entry(&pid.0, entry_flag); - } } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 745a5d5..eec6743 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -5,10 +5,7 @@ use log::{error, info, trace}; use rand::{rngs::OsRng, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; @@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]); pub struct RefreshService { /// Routing table - table: Arc>, + table: Arc, /// Resolved listen endpoint listen_endpoint: Option>, @@ -55,7 +52,7 @@ impl RefreshService { /// Creates a new refresh service pub fn new( config: Arc, - table: Arc>, + table: Arc, monitor: Arc, executor: Executor, ) -> Self { @@ -122,7 +119,7 @@ impl RefreshService { self.monitor.notify(DiscoveryEvent::RefreshStarted).await; let mut entries: Vec = vec![]; - for bucket in self.table.lock().await.iter() { + for bucket in self.table.buckets() { for entry in bucket .iter() .filter(|e| !e.is_connected() && !e.is_incompatible()) @@ -145,10 +142,7 @@ impl RefreshService { let mut tasks = Vec::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { - self.table - .lock() - .await - .remove_entry(&bucket_entry.entry.key); + self.table.remove_entry(&bucket_entry.entry.key); continue; } @@ -167,16 +161,15 @@ impl RefreshService { let key = &bucket_entry.entry.key; match self.connect(&bucket_entry.entry).await { Ok(_) => { - self.table.lock().await.update_entry(key, PENDING_ENTRY); + self.table.update_entry(key, PENDING_ENTRY); } Err(err) => { trace!("Failed to refresh entry {:?}: {err}", key); - let table = &mut self.table.lock().await; if bucket_entry.failures >= MAX_FAILURES { - table.remove_entry(key); + self.table.remove_entry(key); return; } - table.update_entry(key, UNREACHABLE_ENTRY); + self.table.update_entry(key, UNREACHABLE_ENTRY); } } } diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index bbf4801..49ddac5 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -1,18 +1,20 @@ -use std::net::IpAddr; - mod bucket; mod entry; +use std::net::IpAddr; + +use parking_lot::RwLock; + +use rand::{rngs::OsRng, seq::SliceRandom}; + +use karyon_net::Addr; + pub use bucket::{ Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }; pub use entry::{xor_distance, Entry, Key}; -use rand::{rngs::OsRng, seq::SliceRandom}; - -use karyon_net::Addr; - use bucket::BUCKET_SIZE; use entry::KEY_SIZE; @@ -46,29 +48,30 @@ pub enum AddEntryResult { #[derive(Debug)] pub struct RoutingTable { key: Key, - buckets: Vec, + buckets: RwLock>, } impl RoutingTable { /// Creates a new RoutingTable pub fn new(key: Key) -> Self { let buckets: Vec = (0..TABLE_SIZE).map(|_| Bucket::new()).collect(); - Self { key, buckets } + Self { + key, + buckets: RwLock::new(buckets), + } } /// Adds a new entry to the table and returns a result indicating success, /// failure, or restrictions. - pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult { + pub fn add_entry(&self, entry: Entry) -> AddEntryResult { // Determine the index of the bucket where the entry should be placed. let bucket_idx = match self.bucket_index(&entry.key) { Some(i) => i, None => return AddEntryResult::Ignored, }; - let bucket = &self.buckets[bucket_idx]; - // Check if the entry already exists in the bucket. - if bucket.contains_key(&entry.key) { + if self.contains_key(&entry.key) { return AddEntryResult::Exists; } @@ -77,7 +80,8 @@ impl RoutingTable { return AddEntryResult::Restricted; } - let bucket = &mut self.buckets[bucket_idx]; + let mut buckets = self.buckets.write(); + let bucket = &mut buckets[bucket_idx]; // If the bucket has free space, add the entry and return success. if bucket.len() < BUCKET_SIZE { @@ -99,13 +103,14 @@ impl RoutingTable { /// Check if the table contains the given key. pub fn contains_key(&self, key: &Key) -> bool { + let buckets = self.buckets.read(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return false, }; - let bucket = &self.buckets[bucket_idx]; + let bucket = &buckets[bucket_idx]; bucket.contains_key(key) } @@ -113,14 +118,15 @@ impl RoutingTable { /// by the given key. /// /// If the key is not found, no action is taken. - pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { + pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) { + let mut buckets = self.buckets.write(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return, }; - let bucket = &mut self.buckets[bucket_idx]; + let bucket = &mut buckets[bucket_idx]; bucket.update_entry(key, entry_flag); } @@ -149,11 +155,12 @@ impl RoutingTable { /// Returns a list of the closest entries to the given target key, limited by max_entries. pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec { + let buckets = self.buckets.read(); let mut entries: Vec = vec![]; // Collect entries 'outer: for idx in self.bucket_indexes(target_key) { - let bucket = &self.buckets[idx]; + let bucket = &buckets[idx]; for bucket_entry in bucket.iter() { if bucket_entry.is_unreachable() || bucket_entry.is_unstable() { continue; @@ -175,30 +182,33 @@ impl RoutingTable { } /// Removes an entry with the given key from the routing table, if it exists. - pub fn remove_entry(&mut self, key: &Key) { + pub fn remove_entry(&self, key: &Key) { + let mut buckets = self.buckets.write(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return, }; - let bucket = &mut self.buckets[bucket_idx]; + let bucket = &mut buckets[bucket_idx]; bucket.remove(key); } /// Returns an iterator of entries. - pub fn iter(&self) -> impl Iterator { - self.buckets.iter() + /// FIXME: TODO: avoid cloning the data + pub fn buckets(&self) -> Vec { + self.buckets.read().clone() } /// Returns a random entry from the routing table. - pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> { - for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) { + pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option { + let buckets = self.buckets.read(); + for bucket in buckets.choose_multiple(&mut OsRng, buckets.len()) { for entry in bucket.random_iter(bucket.len()) { if entry.status & entry_flag == 0 { continue; } - return Some(&entry.entry); + return Some(entry.entry.clone()); } } @@ -230,12 +240,13 @@ impl RoutingTable { /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry /// is considered restricted and returns true. fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool { + let buckets = self.buckets.read(); let mut bucket_count = 0; let mut table_count = 0; // Iterate through the routing table's buckets and entries to check // for subnet matches. - for (i, bucket) in self.buckets.iter().enumerate() { + for (i, bucket) in buckets.iter().enumerate() { for e in bucket.iter() { // If there is a subnet match, update the counts. let matched = subnet_match(&e.entry.addr, &entry.addr); @@ -366,7 +377,7 @@ mod tests { } fn table(&self) -> RoutingTable { - let mut table = RoutingTable::new(self.local_key.clone()); + let table = RoutingTable::new(self.local_key.clone()); for entry in self.entries() { let res = table.add_entry(entry); @@ -422,11 +433,11 @@ mod tests { #[test] fn test_random_entry() { let setup = Setup::new(); - let mut table = setup.table(); + let table = setup.table(); let entries = setup.entries(); let entry = table.random_entry(ALL_ENTRY); - assert!(matches!(entry, Some(&_))); + assert!(matches!(entry, Some(_))); let entry = table.random_entry(CONNECTED_ENTRY); assert!(matches!(entry, None)); @@ -442,7 +453,7 @@ mod tests { #[test] fn test_add_entries() { let setup = Setup::new(); - let mut table = setup.table(); + let table = setup.table(); let key = [ 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -- cgit v1.2.3