diff options
Diffstat (limited to 'p2p/src/routing_table')
-rw-r--r-- | p2p/src/routing_table/bucket.rs | 123 | ||||
-rw-r--r-- | p2p/src/routing_table/entry.rs | 41 | ||||
-rw-r--r-- | p2p/src/routing_table/mod.rs | 461 |
3 files changed, 625 insertions, 0 deletions
diff --git a/p2p/src/routing_table/bucket.rs b/p2p/src/routing_table/bucket.rs new file mode 100644 index 0000000..13edd24 --- /dev/null +++ b/p2p/src/routing_table/bucket.rs @@ -0,0 +1,123 @@ +use super::{Entry, Key}; + +use rand::{rngs::OsRng, seq::SliceRandom}; + +/// BITFLAGS represent the status of an Entry within a bucket. +pub type EntryStatusFlag = u16; + +/// The entry is connected. +pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001; + +/// The entry is disconnected. This will increase the failure counter. +pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010; + +/// The entry is ready to reconnect, meaning it has either been added and +/// has no connection attempts, or it has been refreshed. +pub const PENDING_ENTRY: EntryStatusFlag = 0b00100; + +/// The entry is unreachable. This will increase the failure counter. +pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000; + +/// The entry is unstable. This will increase the failure counter. +pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000; + +#[allow(dead_code)] +pub const ALL_ENTRY: EntryStatusFlag = 0b11111; + +/// A BucketEntry represents a peer in the routing table. +#[derive(Clone, Debug)] +pub struct BucketEntry { + pub status: EntryStatusFlag, + pub entry: Entry, + pub failures: u32, + pub last_seen: i64, +} + +impl BucketEntry { + pub fn is_connected(&self) -> bool { + self.status ^ CONNECTED_ENTRY == 0 + } + + pub fn is_unreachable(&self) -> bool { + self.status ^ UNREACHABLE_ENTRY == 0 + } + + pub fn is_unstable(&self) -> bool { + self.status ^ UNSTABLE_ENTRY == 0 + } +} + +/// The number of entries that can be stored within a single bucket. +pub const BUCKET_SIZE: usize = 20; + +/// A Bucket represents a group of entries in the routing table. +#[derive(Debug, Clone)] +pub struct Bucket { + entries: Vec<BucketEntry>, +} + +impl Bucket { + /// Creates a new empty Bucket + pub fn new() -> Self { + Self { + entries: Vec::with_capacity(BUCKET_SIZE), + } + } + + /// Add an entry to the bucket. + pub fn add(&mut self, entry: &Entry) { + self.entries.push(BucketEntry { + status: PENDING_ENTRY, + entry: entry.clone(), + failures: 0, + last_seen: chrono::Utc::now().timestamp(), + }) + } + + /// Get the number of entries in the bucket. + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns an iterator over the entries in the bucket. + pub fn iter(&self) -> impl Iterator<Item = &BucketEntry> { + self.entries.iter() + } + + /// Remove an entry. + pub fn remove(&mut self, key: &Key) { + let position = self.entries.iter().position(|e| &e.entry.key == key); + if let Some(i) = position { + self.entries.remove(i); + } + } + + /// Returns an iterator of entries in random order. + pub fn random_iter(&self, amount: usize) -> impl Iterator<Item = &BucketEntry> { + self.entries.choose_multiple(&mut OsRng, amount) + } + + /// Updates the status of an entry in the bucket identified by the given key. + /// + /// If the key is not found in the bucket, no action is taken. + /// + /// This will also update the last_seen field and increase the failures + /// counter for the bucket entry according to the new status. + pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { + if let Some(e) = self.entries.iter_mut().find(|e| &e.entry.key == key) { + e.status = entry_flag; + if e.is_unreachable() || e.is_unstable() { + e.failures += 1; + } + + if !e.is_unreachable() { + e.last_seen = chrono::Utc::now().timestamp(); + } + } + } + + /// Check if the bucket contains the given key. + pub fn contains_key(&self, key: &Key) -> bool { + self.entries.iter().any(|e| &e.entry.key == key) + } +} diff --git a/p2p/src/routing_table/entry.rs b/p2p/src/routing_table/entry.rs new file mode 100644 index 0000000..b3f219f --- /dev/null +++ b/p2p/src/routing_table/entry.rs @@ -0,0 +1,41 @@ +use bincode::{Decode, Encode}; + +use karyons_net::{Addr, Port}; + +/// Specifies the size of the key, in bytes. +pub const KEY_SIZE: usize = 32; + +/// An Entry represents a peer in the routing table. +#[derive(Encode, Decode, Clone, Debug)] +pub struct Entry { + /// The unique key identifying the peer. + pub key: Key, + /// The IP address of the peer. + pub addr: Addr, + /// TCP port + pub port: Port, + /// UDP/TCP port + pub discovery_port: Port, +} + +impl PartialEq for Entry { + fn eq(&self, other: &Self) -> bool { + // XXX this should also compare both addresses (the self.addr == other.addr) + self.key == other.key + } +} + +/// The unique key identifying the peer. +pub type Key = [u8; KEY_SIZE]; + +/// Calculates the XOR distance between two provided keys. +/// +/// The XOR distance is a metric used in Kademlia to measure the closeness +/// of keys. +pub fn xor_distance(key: &Key, other: &Key) -> Key { + let mut res = [0; 32]; + for (i, (k, o)) in key.iter().zip(other.iter()).enumerate() { + res[i] = k ^ o; + } + res +} diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs new file mode 100644 index 0000000..abf9a08 --- /dev/null +++ b/p2p/src/routing_table/mod.rs @@ -0,0 +1,461 @@ +mod bucket; +mod entry; +pub use bucket::{ + Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, + UNREACHABLE_ENTRY, UNSTABLE_ENTRY, +}; +pub use entry::{xor_distance, Entry, Key}; + +use rand::{rngs::OsRng, seq::SliceRandom}; + +use crate::utils::subnet_match; + +use bucket::BUCKET_SIZE; +use entry::KEY_SIZE; + +/// The total number of buckets in the routing table. +const TABLE_SIZE: usize = 32; + +/// The distance limit for the closest buckets. +const DISTANCE_LIMIT: usize = 32; + +/// The maximum number of matched subnets allowed within a single bucket. +const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1; + +/// The maximum number of matched subnets across the entire routing table. +const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6; + +/// Represents the possible result when adding a new entry. +#[derive(Debug)] +pub enum AddEntryResult { + /// The entry is added. + Added, + /// The entry is already exists. + Exists, + /// The entry is ignored. + Ignored, + /// The entry is restricted and not allowed. + Restricted, +} + +/// This is a modified version of the Kademlia Distributed Hash Table (DHT). +/// https://en.wikipedia.org/wiki/Kademlia +#[derive(Debug)] +pub struct RoutingTable { + key: Key, + buckets: Vec<Bucket>, +} + +impl RoutingTable { + /// Creates a new RoutingTable + pub fn new(key: Key) -> Self { + let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect(); + Self { key, buckets } + } + + /// Adds a new entry to the table and returns a result indicating success, + /// failure, or restrictions. + pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult { + // Determine the index of the bucket where the entry should be placed. + let bucket_idx = match self.bucket_index(&entry.key) { + Some(i) => i, + None => return AddEntryResult::Ignored, + }; + + let bucket = &self.buckets[bucket_idx]; + + // Check if the entry already exists in the bucket. + if bucket.contains_key(&entry.key) { + return AddEntryResult::Exists; + } + + // Check if the entry is restricted. + if self.subnet_restricted(bucket_idx, &entry) { + return AddEntryResult::Restricted; + } + + let bucket = &mut self.buckets[bucket_idx]; + + // If the bucket has free space, add the entry and return success. + if bucket.len() < BUCKET_SIZE { + bucket.add(&entry); + return AddEntryResult::Added; + } + + // If the bucket is full, the entry is ignored. + AddEntryResult::Ignored + } + + /// Check if the table contains the given key. + pub fn contains_key(&self, key: &Key) -> bool { + // Determine the bucket index for the given key. + let bucket_idx = match self.bucket_index(key) { + Some(bi) => bi, + None => return false, + }; + + let bucket = &self.buckets[bucket_idx]; + bucket.contains_key(key) + } + + /// Updates the status of an entry in the routing table identified + /// by the given key. + /// + /// If the key is not found, no action is taken. + pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { + // Determine the bucket index for the given key. + let bucket_idx = match self.bucket_index(key) { + Some(bi) => bi, + None => return, + }; + + let bucket = &mut self.buckets[bucket_idx]; + bucket.update_entry(key, entry_flag); + } + + /// Returns a list of bucket indexes that are closest to the given target key. + pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> { + let mut indexes = vec![]; + + // Determine the primary bucket index for the target key. + let bucket_idx = self.bucket_index(target_key).unwrap_or(0); + + indexes.push(bucket_idx); + + // Add additional bucket indexes within a certain distance limit. + for i in 1..DISTANCE_LIMIT { + if bucket_idx >= i && bucket_idx - i >= 1 { + indexes.push(bucket_idx - i); + } + + if bucket_idx + i < (TABLE_SIZE - 1) { + indexes.push(bucket_idx + i); + } + } + + indexes + } + + /// Returns a list of the closest entries to the given target key, limited by max_entries. + pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> { + let mut entries: Vec<Entry> = vec![]; + + // Collect entries + 'outer: for idx in self.bucket_indexes(target_key) { + let bucket = &self.buckets[idx]; + for bucket_entry in bucket.iter() { + if bucket_entry.is_unreachable() || bucket_entry.is_unstable() { + continue; + } + + entries.push(bucket_entry.entry.clone()); + if entries.len() == max_entries { + break 'outer; + } + } + } + + // Sort the entries by their distance to the target key. + entries.sort_by(|a, b| { + xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key)) + }); + + entries + } + + /// Removes an entry with the given key from the routing table, if it exists. + pub fn remove_entry(&mut self, key: &Key) { + // Determine the bucket index for the given key. + let bucket_idx = match self.bucket_index(key) { + Some(bi) => bi, + None => return, + }; + + let bucket = &mut self.buckets[bucket_idx]; + bucket.remove(key); + } + + /// Returns an iterator of entries. + pub fn iter(&self) -> impl Iterator<Item = &Bucket> { + self.buckets.iter() + } + + /// Returns a random entry from the routing table. + pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> { + for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) { + for entry in bucket.random_iter(bucket.len()) { + if entry.status & entry_flag == 0 { + continue; + } + return Some(&entry.entry); + } + } + + None + } + + // Returns the bucket index for a given key in the table. + fn bucket_index(&self, key: &Key) -> Option<usize> { + // Calculate the XOR distance between the self key and the provided key. + let distance = xor_distance(&self.key, key); + + for (i, b) in distance.iter().enumerate() { + if *b != 0 { + let lz = i * 8 + b.leading_zeros() as usize; + let bits = KEY_SIZE * 8 - 1; + let idx = (bits - lz) / 8; + return Some(idx); + } + } + None + } + + /// This function iterate through the routing table and counts how many + /// entries in the same subnet as the given Entry are already present. + /// + /// If the number of matching entries in the same bucket exceeds a + /// threshold (MAX_MATCHED_SUBNET_IN_BUCKET), or if the total count of + /// matching entries in the entire table exceeds a threshold + /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry + /// is considered restricted and returns true. + fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool { + let mut bucket_count = 0; + let mut table_count = 0; + + // Iterate through the routing table's buckets and entries to check + // for subnet matches. + for (i, bucket) in self.buckets.iter().enumerate() { + for e in bucket.iter() { + // If there is a subnet match, update the counts. + let matched = subnet_match(&e.entry.addr, &entry.addr); + if matched { + if i == idx { + bucket_count += 1; + } + table_count += 1; + } + + // If the number of matched entries in the same bucket exceeds + // the limit, return true + if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET { + return true; + } + } + + // If the total matched entries in the table exceed the limit, + // return true. + if table_count >= MAX_MATCHED_SUBNET_IN_TABLE { + return true; + } + } + + // If no subnet restrictions are encountered, return false. + false + } +} + +#[cfg(test)] +mod tests { + use super::bucket::ALL_ENTRY; + use super::*; + + use karyons_net::Addr; + + struct Setup { + local_key: Key, + keys: Vec<Key>, + } + + fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry { + Entry { + key: key.clone(), + addr: addr.clone(), + port, + discovery_port, + } + } + + impl Setup { + fn new() -> Self { + let keys = vec![ + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 1, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 1, 0, 1, 1, 2, + ], + [ + 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 3, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0, + 0, 0, 0, 0, 0, 4, + ], + [ + 223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 5, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0, + 0, 0, 0, 0, 0, 6, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0, + 0, 0, 0, 0, 0, 0, 7, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0, + 0, 0, 0, 0, 0, 0, 8, + ], + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0, + 0, 0, 0, 0, 0, 0, 9, + ], + ]; + + Self { + local_key: [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, + ], + keys, + } + } + + fn entries(&self) -> Vec<Entry> { + let mut entries = vec![]; + for (i, key) in self.keys.iter().enumerate() { + entries.push(new_entry( + key, + &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()), + 3000, + 3010, + )); + } + entries + } + + fn table(&self) -> RoutingTable { + let mut table = RoutingTable::new(self.local_key.clone()); + + for entry in self.entries() { + let res = table.add_entry(entry); + assert!(matches!(res, AddEntryResult::Added)); + } + + table + } + } + + #[test] + fn test_bucket_index() { + let setup = Setup::new(); + let table = setup.table(); + + assert_eq!(table.bucket_index(&setup.local_key), None); + assert_eq!(table.bucket_index(&setup.keys[0]), Some(0)); + assert_eq!(table.bucket_index(&setup.keys[1]), Some(5)); + assert_eq!(table.bucket_index(&setup.keys[2]), Some(26)); + assert_eq!(table.bucket_index(&setup.keys[3]), Some(11)); + assert_eq!(table.bucket_index(&setup.keys[4]), Some(31)); + assert_eq!(table.bucket_index(&setup.keys[5]), Some(11)); + assert_eq!(table.bucket_index(&setup.keys[6]), Some(12)); + assert_eq!(table.bucket_index(&setup.keys[7]), Some(13)); + assert_eq!(table.bucket_index(&setup.keys[8]), Some(14)); + } + + #[test] + fn test_closest_entries() { + let setup = Setup::new(); + let table = setup.table(); + let entries = setup.entries(); + + assert_eq!( + table.closest_entries(&setup.keys[5], 8), + vec![ + entries[5].clone(), + entries[3].clone(), + entries[1].clone(), + entries[6].clone(), + entries[7].clone(), + entries[8].clone(), + entries[2].clone(), + ] + ); + + assert_eq!( + table.closest_entries(&setup.keys[4], 2), + vec![entries[4].clone(), entries[2].clone()] + ); + } + + #[test] + fn test_random_entry() { + let setup = Setup::new(); + let mut table = setup.table(); + let entries = setup.entries(); + + let entry = table.random_entry(ALL_ENTRY); + assert!(matches!(entry, Some(&_))); + + let entry = table.random_entry(CONNECTED_ENTRY); + assert!(matches!(entry, None)); + + for entry in entries { + table.remove_entry(&entry.key); + } + + let entry = table.random_entry(ALL_ENTRY); + assert!(matches!(entry, None)); + } + + #[test] + fn test_add_entries() { + let setup = Setup::new(); + let mut table = setup.table(); + + let key = [ + 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 5, + ]; + + let key2 = [ + 0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 5, + ]; + + let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010); + assert!(matches!( + table.add_entry(entry1.clone()), + AddEntryResult::Added + )); + + assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists)); + + let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010); + assert!(matches!( + table.add_entry(entry2), + AddEntryResult::Restricted + )); + + let mut key: [u8; 32] = [0; 32]; + + for i in 0..BUCKET_SIZE { + key[i] += 1; + let entry = new_entry( + &key, + &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()), + 3000, + 3010, + ); + table.add_entry(entry); + } + + key[BUCKET_SIZE] += 1; + let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010); + assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored)); + } +} |