From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- p2p/src/routing_table/mod.rs | 69 +++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 29 deletions(-) (limited to 'p2p/src/routing_table') diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index bbf4801..49ddac5 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -1,18 +1,20 @@ -use std::net::IpAddr; - mod bucket; mod entry; +use std::net::IpAddr; + +use parking_lot::RwLock; + +use rand::{rngs::OsRng, seq::SliceRandom}; + +use karyon_net::Addr; + pub use bucket::{ Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, }; pub use entry::{xor_distance, Entry, Key}; -use rand::{rngs::OsRng, seq::SliceRandom}; - -use karyon_net::Addr; - use bucket::BUCKET_SIZE; use entry::KEY_SIZE; @@ -46,29 +48,30 @@ pub enum AddEntryResult { #[derive(Debug)] pub struct RoutingTable { key: Key, - buckets: Vec, + buckets: RwLock>, } impl RoutingTable { /// Creates a new RoutingTable pub fn new(key: Key) -> Self { let buckets: Vec = (0..TABLE_SIZE).map(|_| Bucket::new()).collect(); - Self { key, buckets } + Self { + key, + buckets: RwLock::new(buckets), + } } /// Adds a new entry to the table and returns a result indicating success, /// failure, or restrictions. - pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult { + pub fn add_entry(&self, entry: Entry) -> AddEntryResult { // Determine the index of the bucket where the entry should be placed. let bucket_idx = match self.bucket_index(&entry.key) { Some(i) => i, None => return AddEntryResult::Ignored, }; - let bucket = &self.buckets[bucket_idx]; - // Check if the entry already exists in the bucket. - if bucket.contains_key(&entry.key) { + if self.contains_key(&entry.key) { return AddEntryResult::Exists; } @@ -77,7 +80,8 @@ impl RoutingTable { return AddEntryResult::Restricted; } - let bucket = &mut self.buckets[bucket_idx]; + let mut buckets = self.buckets.write(); + let bucket = &mut buckets[bucket_idx]; // If the bucket has free space, add the entry and return success. if bucket.len() < BUCKET_SIZE { @@ -99,13 +103,14 @@ impl RoutingTable { /// Check if the table contains the given key. pub fn contains_key(&self, key: &Key) -> bool { + let buckets = self.buckets.read(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return false, }; - let bucket = &self.buckets[bucket_idx]; + let bucket = &buckets[bucket_idx]; bucket.contains_key(key) } @@ -113,14 +118,15 @@ impl RoutingTable { /// by the given key. /// /// If the key is not found, no action is taken. - pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { + pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) { + let mut buckets = self.buckets.write(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return, }; - let bucket = &mut self.buckets[bucket_idx]; + let bucket = &mut buckets[bucket_idx]; bucket.update_entry(key, entry_flag); } @@ -149,11 +155,12 @@ impl RoutingTable { /// Returns a list of the closest entries to the given target key, limited by max_entries. pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec { + let buckets = self.buckets.read(); let mut entries: Vec = vec![]; // Collect entries 'outer: for idx in self.bucket_indexes(target_key) { - let bucket = &self.buckets[idx]; + let bucket = &buckets[idx]; for bucket_entry in bucket.iter() { if bucket_entry.is_unreachable() || bucket_entry.is_unstable() { continue; @@ -175,30 +182,33 @@ impl RoutingTable { } /// Removes an entry with the given key from the routing table, if it exists. - pub fn remove_entry(&mut self, key: &Key) { + pub fn remove_entry(&self, key: &Key) { + let mut buckets = self.buckets.write(); // Determine the bucket index for the given key. let bucket_idx = match self.bucket_index(key) { Some(bi) => bi, None => return, }; - let bucket = &mut self.buckets[bucket_idx]; + let bucket = &mut buckets[bucket_idx]; bucket.remove(key); } /// Returns an iterator of entries. - pub fn iter(&self) -> impl Iterator { - self.buckets.iter() + /// FIXME: TODO: avoid cloning the data + pub fn buckets(&self) -> Vec { + self.buckets.read().clone() } /// Returns a random entry from the routing table. - pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> { - for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) { + pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option { + let buckets = self.buckets.read(); + for bucket in buckets.choose_multiple(&mut OsRng, buckets.len()) { for entry in bucket.random_iter(bucket.len()) { if entry.status & entry_flag == 0 { continue; } - return Some(&entry.entry); + return Some(entry.entry.clone()); } } @@ -230,12 +240,13 @@ impl RoutingTable { /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry /// is considered restricted and returns true. fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool { + let buckets = self.buckets.read(); let mut bucket_count = 0; let mut table_count = 0; // Iterate through the routing table's buckets and entries to check // for subnet matches. - for (i, bucket) in self.buckets.iter().enumerate() { + for (i, bucket) in buckets.iter().enumerate() { for e in bucket.iter() { // If there is a subnet match, update the counts. let matched = subnet_match(&e.entry.addr, &entry.addr); @@ -366,7 +377,7 @@ mod tests { } fn table(&self) -> RoutingTable { - let mut table = RoutingTable::new(self.local_key.clone()); + let table = RoutingTable::new(self.local_key.clone()); for entry in self.entries() { let res = table.add_entry(entry); @@ -422,11 +433,11 @@ mod tests { #[test] fn test_random_entry() { let setup = Setup::new(); - let mut table = setup.table(); + let table = setup.table(); let entries = setup.entries(); let entry = table.random_entry(ALL_ENTRY); - assert!(matches!(entry, Some(&_))); + assert!(matches!(entry, Some(_))); let entry = table.random_entry(CONNECTED_ENTRY); assert!(matches!(entry, None)); @@ -442,7 +453,7 @@ mod tests { #[test] fn test_add_entries() { let setup = Setup::new(); - let mut table = setup.table(); + let table = setup.table(); let key = [ 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -- cgit v1.2.3