aboutsummaryrefslogtreecommitdiff
path: root/karyons_p2p/src/routing_table
diff options
context:
space:
mode:
Diffstat (limited to 'karyons_p2p/src/routing_table')
-rw-r--r--karyons_p2p/src/routing_table/bucket.rs123
-rw-r--r--karyons_p2p/src/routing_table/entry.rs41
-rw-r--r--karyons_p2p/src/routing_table/mod.rs461
3 files changed, 0 insertions, 625 deletions
diff --git a/karyons_p2p/src/routing_table/bucket.rs b/karyons_p2p/src/routing_table/bucket.rs
deleted file mode 100644
index 13edd24..0000000
--- a/karyons_p2p/src/routing_table/bucket.rs
+++ /dev/null
@@ -1,123 +0,0 @@
-use super::{Entry, Key};
-
-use rand::{rngs::OsRng, seq::SliceRandom};
-
-/// BITFLAGS represent the status of an Entry within a bucket.
-pub type EntryStatusFlag = u16;
-
-/// The entry is connected.
-pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001;
-
-/// The entry is disconnected. This will increase the failure counter.
-pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010;
-
-/// The entry is ready to reconnect, meaning it has either been added and
-/// has no connection attempts, or it has been refreshed.
-pub const PENDING_ENTRY: EntryStatusFlag = 0b00100;
-
-/// The entry is unreachable. This will increase the failure counter.
-pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000;
-
-/// The entry is unstable. This will increase the failure counter.
-pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000;
-
-#[allow(dead_code)]
-pub const ALL_ENTRY: EntryStatusFlag = 0b11111;
-
-/// A BucketEntry represents a peer in the routing table.
-#[derive(Clone, Debug)]
-pub struct BucketEntry {
- pub status: EntryStatusFlag,
- pub entry: Entry,
- pub failures: u32,
- pub last_seen: i64,
-}
-
-impl BucketEntry {
- pub fn is_connected(&self) -> bool {
- self.status ^ CONNECTED_ENTRY == 0
- }
-
- pub fn is_unreachable(&self) -> bool {
- self.status ^ UNREACHABLE_ENTRY == 0
- }
-
- pub fn is_unstable(&self) -> bool {
- self.status ^ UNSTABLE_ENTRY == 0
- }
-}
-
-/// The number of entries that can be stored within a single bucket.
-pub const BUCKET_SIZE: usize = 20;
-
-/// A Bucket represents a group of entries in the routing table.
-#[derive(Debug, Clone)]
-pub struct Bucket {
- entries: Vec<BucketEntry>,
-}
-
-impl Bucket {
- /// Creates a new empty Bucket
- pub fn new() -> Self {
- Self {
- entries: Vec::with_capacity(BUCKET_SIZE),
- }
- }
-
- /// Add an entry to the bucket.
- pub fn add(&mut self, entry: &Entry) {
- self.entries.push(BucketEntry {
- status: PENDING_ENTRY,
- entry: entry.clone(),
- failures: 0,
- last_seen: chrono::Utc::now().timestamp(),
- })
- }
-
- /// Get the number of entries in the bucket.
- pub fn len(&self) -> usize {
- self.entries.len()
- }
-
- /// Returns an iterator over the entries in the bucket.
- pub fn iter(&self) -> impl Iterator<Item = &BucketEntry> {
- self.entries.iter()
- }
-
- /// Remove an entry.
- pub fn remove(&mut self, key: &Key) {
- let position = self.entries.iter().position(|e| &e.entry.key == key);
- if let Some(i) = position {
- self.entries.remove(i);
- }
- }
-
- /// Returns an iterator of entries in random order.
- pub fn random_iter(&self, amount: usize) -> impl Iterator<Item = &BucketEntry> {
- self.entries.choose_multiple(&mut OsRng, amount)
- }
-
- /// Updates the status of an entry in the bucket identified by the given key.
- ///
- /// If the key is not found in the bucket, no action is taken.
- ///
- /// This will also update the last_seen field and increase the failures
- /// counter for the bucket entry according to the new status.
- pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
- if let Some(e) = self.entries.iter_mut().find(|e| &e.entry.key == key) {
- e.status = entry_flag;
- if e.is_unreachable() || e.is_unstable() {
- e.failures += 1;
- }
-
- if !e.is_unreachable() {
- e.last_seen = chrono::Utc::now().timestamp();
- }
- }
- }
-
- /// Check if the bucket contains the given key.
- pub fn contains_key(&self, key: &Key) -> bool {
- self.entries.iter().any(|e| &e.entry.key == key)
- }
-}
diff --git a/karyons_p2p/src/routing_table/entry.rs b/karyons_p2p/src/routing_table/entry.rs
deleted file mode 100644
index b3f219f..0000000
--- a/karyons_p2p/src/routing_table/entry.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-use bincode::{Decode, Encode};
-
-use karyons_net::{Addr, Port};
-
-/// Specifies the size of the key, in bytes.
-pub const KEY_SIZE: usize = 32;
-
-/// An Entry represents a peer in the routing table.
-#[derive(Encode, Decode, Clone, Debug)]
-pub struct Entry {
- /// The unique key identifying the peer.
- pub key: Key,
- /// The IP address of the peer.
- pub addr: Addr,
- /// TCP port
- pub port: Port,
- /// UDP/TCP port
- pub discovery_port: Port,
-}
-
-impl PartialEq for Entry {
- fn eq(&self, other: &Self) -> bool {
- // XXX this should also compare both addresses (the self.addr == other.addr)
- self.key == other.key
- }
-}
-
-/// The unique key identifying the peer.
-pub type Key = [u8; KEY_SIZE];
-
-/// Calculates the XOR distance between two provided keys.
-///
-/// The XOR distance is a metric used in Kademlia to measure the closeness
-/// of keys.
-pub fn xor_distance(key: &Key, other: &Key) -> Key {
- let mut res = [0; 32];
- for (i, (k, o)) in key.iter().zip(other.iter()).enumerate() {
- res[i] = k ^ o;
- }
- res
-}
diff --git a/karyons_p2p/src/routing_table/mod.rs b/karyons_p2p/src/routing_table/mod.rs
deleted file mode 100644
index abf9a08..0000000
--- a/karyons_p2p/src/routing_table/mod.rs
+++ /dev/null
@@ -1,461 +0,0 @@
-mod bucket;
-mod entry;
-pub use bucket::{
- Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
- UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
-};
-pub use entry::{xor_distance, Entry, Key};
-
-use rand::{rngs::OsRng, seq::SliceRandom};
-
-use crate::utils::subnet_match;
-
-use bucket::BUCKET_SIZE;
-use entry::KEY_SIZE;
-
-/// The total number of buckets in the routing table.
-const TABLE_SIZE: usize = 32;
-
-/// The distance limit for the closest buckets.
-const DISTANCE_LIMIT: usize = 32;
-
-/// The maximum number of matched subnets allowed within a single bucket.
-const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1;
-
-/// The maximum number of matched subnets across the entire routing table.
-const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6;
-
-/// Represents the possible result when adding a new entry.
-#[derive(Debug)]
-pub enum AddEntryResult {
- /// The entry is added.
- Added,
- /// The entry is already exists.
- Exists,
- /// The entry is ignored.
- Ignored,
- /// The entry is restricted and not allowed.
- Restricted,
-}
-
-/// This is a modified version of the Kademlia Distributed Hash Table (DHT).
-/// https://en.wikipedia.org/wiki/Kademlia
-#[derive(Debug)]
-pub struct RoutingTable {
- key: Key,
- buckets: Vec<Bucket>,
-}
-
-impl RoutingTable {
- /// Creates a new RoutingTable
- pub fn new(key: Key) -> Self {
- let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
- Self { key, buckets }
- }
-
- /// Adds a new entry to the table and returns a result indicating success,
- /// failure, or restrictions.
- pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult {
- // Determine the index of the bucket where the entry should be placed.
- let bucket_idx = match self.bucket_index(&entry.key) {
- Some(i) => i,
- None => return AddEntryResult::Ignored,
- };
-
- let bucket = &self.buckets[bucket_idx];
-
- // Check if the entry already exists in the bucket.
- if bucket.contains_key(&entry.key) {
- return AddEntryResult::Exists;
- }
-
- // Check if the entry is restricted.
- if self.subnet_restricted(bucket_idx, &entry) {
- return AddEntryResult::Restricted;
- }
-
- let bucket = &mut self.buckets[bucket_idx];
-
- // If the bucket has free space, add the entry and return success.
- if bucket.len() < BUCKET_SIZE {
- bucket.add(&entry);
- return AddEntryResult::Added;
- }
-
- // If the bucket is full, the entry is ignored.
- AddEntryResult::Ignored
- }
-
- /// Check if the table contains the given key.
- pub fn contains_key(&self, key: &Key) -> bool {
- // Determine the bucket index for the given key.
- let bucket_idx = match self.bucket_index(key) {
- Some(bi) => bi,
- None => return false,
- };
-
- let bucket = &self.buckets[bucket_idx];
- bucket.contains_key(key)
- }
-
- /// Updates the status of an entry in the routing table identified
- /// by the given key.
- ///
- /// If the key is not found, no action is taken.
- pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
- // Determine the bucket index for the given key.
- let bucket_idx = match self.bucket_index(key) {
- Some(bi) => bi,
- None => return,
- };
-
- let bucket = &mut self.buckets[bucket_idx];
- bucket.update_entry(key, entry_flag);
- }
-
- /// Returns a list of bucket indexes that are closest to the given target key.
- pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> {
- let mut indexes = vec![];
-
- // Determine the primary bucket index for the target key.
- let bucket_idx = self.bucket_index(target_key).unwrap_or(0);
-
- indexes.push(bucket_idx);
-
- // Add additional bucket indexes within a certain distance limit.
- for i in 1..DISTANCE_LIMIT {
- if bucket_idx >= i && bucket_idx - i >= 1 {
- indexes.push(bucket_idx - i);
- }
-
- if bucket_idx + i < (TABLE_SIZE - 1) {
- indexes.push(bucket_idx + i);
- }
- }
-
- indexes
- }
-
- /// Returns a list of the closest entries to the given target key, limited by max_entries.
- pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
- let mut entries: Vec<Entry> = vec![];
-
- // Collect entries
- 'outer: for idx in self.bucket_indexes(target_key) {
- let bucket = &self.buckets[idx];
- for bucket_entry in bucket.iter() {
- if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
- continue;
- }
-
- entries.push(bucket_entry.entry.clone());
- if entries.len() == max_entries {
- break 'outer;
- }
- }
- }
-
- // Sort the entries by their distance to the target key.
- entries.sort_by(|a, b| {
- xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key))
- });
-
- entries
- }
-
- /// Removes an entry with the given key from the routing table, if it exists.
- pub fn remove_entry(&mut self, key: &Key) {
- // Determine the bucket index for the given key.
- let bucket_idx = match self.bucket_index(key) {
- Some(bi) => bi,
- None => return,
- };
-
- let bucket = &mut self.buckets[bucket_idx];
- bucket.remove(key);
- }
-
- /// Returns an iterator of entries.
- pub fn iter(&self) -> impl Iterator<Item = &Bucket> {
- self.buckets.iter()
- }
-
- /// Returns a random entry from the routing table.
- pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> {
- for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) {
- for entry in bucket.random_iter(bucket.len()) {
- if entry.status & entry_flag == 0 {
- continue;
- }
- return Some(&entry.entry);
- }
- }
-
- None
- }
-
- // Returns the bucket index for a given key in the table.
- fn bucket_index(&self, key: &Key) -> Option<usize> {
- // Calculate the XOR distance between the self key and the provided key.
- let distance = xor_distance(&self.key, key);
-
- for (i, b) in distance.iter().enumerate() {
- if *b != 0 {
- let lz = i * 8 + b.leading_zeros() as usize;
- let bits = KEY_SIZE * 8 - 1;
- let idx = (bits - lz) / 8;
- return Some(idx);
- }
- }
- None
- }
-
- /// This function iterate through the routing table and counts how many
- /// entries in the same subnet as the given Entry are already present.
- ///
- /// If the number of matching entries in the same bucket exceeds a
- /// threshold (MAX_MATCHED_SUBNET_IN_BUCKET), or if the total count of
- /// matching entries in the entire table exceeds a threshold
- /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry
- /// is considered restricted and returns true.
- fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool {
- let mut bucket_count = 0;
- let mut table_count = 0;
-
- // Iterate through the routing table's buckets and entries to check
- // for subnet matches.
- for (i, bucket) in self.buckets.iter().enumerate() {
- for e in bucket.iter() {
- // If there is a subnet match, update the counts.
- let matched = subnet_match(&e.entry.addr, &entry.addr);
- if matched {
- if i == idx {
- bucket_count += 1;
- }
- table_count += 1;
- }
-
- // If the number of matched entries in the same bucket exceeds
- // the limit, return true
- if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET {
- return true;
- }
- }
-
- // If the total matched entries in the table exceed the limit,
- // return true.
- if table_count >= MAX_MATCHED_SUBNET_IN_TABLE {
- return true;
- }
- }
-
- // If no subnet restrictions are encountered, return false.
- false
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::bucket::ALL_ENTRY;
- use super::*;
-
- use karyons_net::Addr;
-
- struct Setup {
- local_key: Key,
- keys: Vec<Key>,
- }
-
- fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry {
- Entry {
- key: key.clone(),
- addr: addr.clone(),
- port,
- discovery_port,
- }
- }
-
- impl Setup {
- fn new() -> Self {
- let keys = vec![
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 1,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 1, 1, 0, 1, 1, 2,
- ],
- [
- 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 3,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0,
- 0, 0, 0, 0, 0, 4,
- ],
- [
- 223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 5,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0,
- 0, 0, 0, 0, 0, 6,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0,
- 0, 0, 0, 0, 0, 0, 7,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0,
- 0, 0, 0, 0, 0, 0, 8,
- ],
- [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0,
- 0, 0, 0, 0, 0, 0, 9,
- ],
- ];
-
- Self {
- local_key: [
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0,
- ],
- keys,
- }
- }
-
- fn entries(&self) -> Vec<Entry> {
- let mut entries = vec![];
- for (i, key) in self.keys.iter().enumerate() {
- entries.push(new_entry(
- key,
- &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
- 3000,
- 3010,
- ));
- }
- entries
- }
-
- fn table(&self) -> RoutingTable {
- let mut table = RoutingTable::new(self.local_key.clone());
-
- for entry in self.entries() {
- let res = table.add_entry(entry);
- assert!(matches!(res, AddEntryResult::Added));
- }
-
- table
- }
- }
-
- #[test]
- fn test_bucket_index() {
- let setup = Setup::new();
- let table = setup.table();
-
- assert_eq!(table.bucket_index(&setup.local_key), None);
- assert_eq!(table.bucket_index(&setup.keys[0]), Some(0));
- assert_eq!(table.bucket_index(&setup.keys[1]), Some(5));
- assert_eq!(table.bucket_index(&setup.keys[2]), Some(26));
- assert_eq!(table.bucket_index(&setup.keys[3]), Some(11));
- assert_eq!(table.bucket_index(&setup.keys[4]), Some(31));
- assert_eq!(table.bucket_index(&setup.keys[5]), Some(11));
- assert_eq!(table.bucket_index(&setup.keys[6]), Some(12));
- assert_eq!(table.bucket_index(&setup.keys[7]), Some(13));
- assert_eq!(table.bucket_index(&setup.keys[8]), Some(14));
- }
-
- #[test]
- fn test_closest_entries() {
- let setup = Setup::new();
- let table = setup.table();
- let entries = setup.entries();
-
- assert_eq!(
- table.closest_entries(&setup.keys[5], 8),
- vec![
- entries[5].clone(),
- entries[3].clone(),
- entries[1].clone(),
- entries[6].clone(),
- entries[7].clone(),
- entries[8].clone(),
- entries[2].clone(),
- ]
- );
-
- assert_eq!(
- table.closest_entries(&setup.keys[4], 2),
- vec![entries[4].clone(), entries[2].clone()]
- );
- }
-
- #[test]
- fn test_random_entry() {
- let setup = Setup::new();
- let mut table = setup.table();
- let entries = setup.entries();
-
- let entry = table.random_entry(ALL_ENTRY);
- assert!(matches!(entry, Some(&_)));
-
- let entry = table.random_entry(CONNECTED_ENTRY);
- assert!(matches!(entry, None));
-
- for entry in entries {
- table.remove_entry(&entry.key);
- }
-
- let entry = table.random_entry(ALL_ENTRY);
- assert!(matches!(entry, None));
- }
-
- #[test]
- fn test_add_entries() {
- let setup = Setup::new();
- let mut table = setup.table();
-
- let key = [
- 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 5,
- ];
-
- let key2 = [
- 0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 5,
- ];
-
- let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010);
- assert!(matches!(
- table.add_entry(entry1.clone()),
- AddEntryResult::Added
- ));
-
- assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists));
-
- let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010);
- assert!(matches!(
- table.add_entry(entry2),
- AddEntryResult::Restricted
- ));
-
- let mut key: [u8; 32] = [0; 32];
-
- for i in 0..BUCKET_SIZE {
- key[i] += 1;
- let entry = new_entry(
- &key,
- &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
- 3000,
- 3010,
- );
- table.add_entry(entry);
- }
-
- key[BUCKET_SIZE] += 1;
- let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010);
- assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored));
- }
-}