aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/routing_table
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/routing_table')
-rw-r--r--p2p/src/routing_table/bucket.rs123
-rw-r--r--p2p/src/routing_table/entry.rs41
-rw-r--r--p2p/src/routing_table/mod.rs461
3 files changed, 625 insertions, 0 deletions
diff --git a/p2p/src/routing_table/bucket.rs b/p2p/src/routing_table/bucket.rs
new file mode 100644
index 0000000..13edd24
--- /dev/null
+++ b/p2p/src/routing_table/bucket.rs
@@ -0,0 +1,123 @@
+use super::{Entry, Key};
+
+use rand::{rngs::OsRng, seq::SliceRandom};
+
+/// BITFLAGS represent the status of an Entry within a bucket.
+pub type EntryStatusFlag = u16;
+
+/// The entry is connected.
+pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001;
+
+/// The entry is disconnected. This will increase the failure counter.
+pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010;
+
+/// The entry is ready to reconnect, meaning it has either been added and
+/// has no connection attempts, or it has been refreshed.
+pub const PENDING_ENTRY: EntryStatusFlag = 0b00100;
+
+/// The entry is unreachable. This will increase the failure counter.
+pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000;
+
+/// The entry is unstable. This will increase the failure counter.
+pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000;
+
+#[allow(dead_code)]
+pub const ALL_ENTRY: EntryStatusFlag = 0b11111;
+
+/// A BucketEntry represents a peer in the routing table.
+#[derive(Clone, Debug)]
+pub struct BucketEntry {
+ pub status: EntryStatusFlag,
+ pub entry: Entry,
+ pub failures: u32,
+ pub last_seen: i64,
+}
+
+impl BucketEntry {
+ pub fn is_connected(&self) -> bool {
+ self.status ^ CONNECTED_ENTRY == 0
+ }
+
+ pub fn is_unreachable(&self) -> bool {
+ self.status ^ UNREACHABLE_ENTRY == 0
+ }
+
+ pub fn is_unstable(&self) -> bool {
+ self.status ^ UNSTABLE_ENTRY == 0
+ }
+}
+
+/// The number of entries that can be stored within a single bucket.
+pub const BUCKET_SIZE: usize = 20;
+
+/// A Bucket represents a group of entries in the routing table.
+#[derive(Debug, Clone)]
+pub struct Bucket {
+ entries: Vec<BucketEntry>,
+}
+
+impl Bucket {
+ /// Creates a new empty Bucket
+ pub fn new() -> Self {
+ Self {
+ entries: Vec::with_capacity(BUCKET_SIZE),
+ }
+ }
+
+ /// Add an entry to the bucket.
+ pub fn add(&mut self, entry: &Entry) {
+ self.entries.push(BucketEntry {
+ status: PENDING_ENTRY,
+ entry: entry.clone(),
+ failures: 0,
+ last_seen: chrono::Utc::now().timestamp(),
+ })
+ }
+
+ /// Get the number of entries in the bucket.
+ pub fn len(&self) -> usize {
+ self.entries.len()
+ }
+
+ /// Returns an iterator over the entries in the bucket.
+ pub fn iter(&self) -> impl Iterator<Item = &BucketEntry> {
+ self.entries.iter()
+ }
+
+ /// Remove an entry.
+ pub fn remove(&mut self, key: &Key) {
+ let position = self.entries.iter().position(|e| &e.entry.key == key);
+ if let Some(i) = position {
+ self.entries.remove(i);
+ }
+ }
+
+ /// Returns an iterator of entries in random order.
+ pub fn random_iter(&self, amount: usize) -> impl Iterator<Item = &BucketEntry> {
+ self.entries.choose_multiple(&mut OsRng, amount)
+ }
+
+ /// Updates the status of an entry in the bucket identified by the given key.
+ ///
+ /// If the key is not found in the bucket, no action is taken.
+ ///
+ /// This will also update the last_seen field and increase the failures
+ /// counter for the bucket entry according to the new status.
+ pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
+ if let Some(e) = self.entries.iter_mut().find(|e| &e.entry.key == key) {
+ e.status = entry_flag;
+ if e.is_unreachable() || e.is_unstable() {
+ e.failures += 1;
+ }
+
+ if !e.is_unreachable() {
+ e.last_seen = chrono::Utc::now().timestamp();
+ }
+ }
+ }
+
+ /// Check if the bucket contains the given key.
+ pub fn contains_key(&self, key: &Key) -> bool {
+ self.entries.iter().any(|e| &e.entry.key == key)
+ }
+}
diff --git a/p2p/src/routing_table/entry.rs b/p2p/src/routing_table/entry.rs
new file mode 100644
index 0000000..b3f219f
--- /dev/null
+++ b/p2p/src/routing_table/entry.rs
@@ -0,0 +1,41 @@
+use bincode::{Decode, Encode};
+
+use karyons_net::{Addr, Port};
+
+/// Specifies the size of the key, in bytes.
+pub const KEY_SIZE: usize = 32;
+
+/// An Entry represents a peer in the routing table.
+#[derive(Encode, Decode, Clone, Debug)]
+pub struct Entry {
+ /// The unique key identifying the peer.
+ pub key: Key,
+ /// The IP address of the peer.
+ pub addr: Addr,
+ /// TCP port
+ pub port: Port,
+ /// UDP/TCP port
+ pub discovery_port: Port,
+}
+
+impl PartialEq for Entry {
+ fn eq(&self, other: &Self) -> bool {
+ // XXX this should also compare both addresses (the self.addr == other.addr)
+ self.key == other.key
+ }
+}
+
+/// The unique key identifying the peer.
+pub type Key = [u8; KEY_SIZE];
+
+/// Calculates the XOR distance between two provided keys.
+///
+/// The XOR distance is a metric used in Kademlia to measure the closeness
+/// of keys.
+pub fn xor_distance(key: &Key, other: &Key) -> Key {
+ let mut res = [0; 32];
+ for (i, (k, o)) in key.iter().zip(other.iter()).enumerate() {
+ res[i] = k ^ o;
+ }
+ res
+}
diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs
new file mode 100644
index 0000000..abf9a08
--- /dev/null
+++ b/p2p/src/routing_table/mod.rs
@@ -0,0 +1,461 @@
+mod bucket;
+mod entry;
+pub use bucket::{
+ Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
+ UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
+};
+pub use entry::{xor_distance, Entry, Key};
+
+use rand::{rngs::OsRng, seq::SliceRandom};
+
+use crate::utils::subnet_match;
+
+use bucket::BUCKET_SIZE;
+use entry::KEY_SIZE;
+
+/// The total number of buckets in the routing table.
+const TABLE_SIZE: usize = 32;
+
+/// The distance limit for the closest buckets.
+const DISTANCE_LIMIT: usize = 32;
+
+/// The maximum number of matched subnets allowed within a single bucket.
+const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1;
+
+/// The maximum number of matched subnets across the entire routing table.
+const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6;
+
+/// Represents the possible result when adding a new entry.
+#[derive(Debug)]
+pub enum AddEntryResult {
+ /// The entry is added.
+ Added,
+ /// The entry is already exists.
+ Exists,
+ /// The entry is ignored.
+ Ignored,
+ /// The entry is restricted and not allowed.
+ Restricted,
+}
+
+/// This is a modified version of the Kademlia Distributed Hash Table (DHT).
+/// https://en.wikipedia.org/wiki/Kademlia
+#[derive(Debug)]
+pub struct RoutingTable {
+ key: Key,
+ buckets: Vec<Bucket>,
+}
+
+impl RoutingTable {
+ /// Creates a new RoutingTable
+ pub fn new(key: Key) -> Self {
+ let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
+ Self { key, buckets }
+ }
+
+ /// Adds a new entry to the table and returns a result indicating success,
+ /// failure, or restrictions.
+ pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult {
+ // Determine the index of the bucket where the entry should be placed.
+ let bucket_idx = match self.bucket_index(&entry.key) {
+ Some(i) => i,
+ None => return AddEntryResult::Ignored,
+ };
+
+ let bucket = &self.buckets[bucket_idx];
+
+ // Check if the entry already exists in the bucket.
+ if bucket.contains_key(&entry.key) {
+ return AddEntryResult::Exists;
+ }
+
+ // Check if the entry is restricted.
+ if self.subnet_restricted(bucket_idx, &entry) {
+ return AddEntryResult::Restricted;
+ }
+
+ let bucket = &mut self.buckets[bucket_idx];
+
+ // If the bucket has free space, add the entry and return success.
+ if bucket.len() < BUCKET_SIZE {
+ bucket.add(&entry);
+ return AddEntryResult::Added;
+ }
+
+ // If the bucket is full, the entry is ignored.
+ AddEntryResult::Ignored
+ }
+
+ /// Check if the table contains the given key.
+ pub fn contains_key(&self, key: &Key) -> bool {
+ // Determine the bucket index for the given key.
+ let bucket_idx = match self.bucket_index(key) {
+ Some(bi) => bi,
+ None => return false,
+ };
+
+ let bucket = &self.buckets[bucket_idx];
+ bucket.contains_key(key)
+ }
+
+ /// Updates the status of an entry in the routing table identified
+ /// by the given key.
+ ///
+ /// If the key is not found, no action is taken.
+ pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
+ // Determine the bucket index for the given key.
+ let bucket_idx = match self.bucket_index(key) {
+ Some(bi) => bi,
+ None => return,
+ };
+
+ let bucket = &mut self.buckets[bucket_idx];
+ bucket.update_entry(key, entry_flag);
+ }
+
+ /// Returns a list of bucket indexes that are closest to the given target key.
+ pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> {
+ let mut indexes = vec![];
+
+ // Determine the primary bucket index for the target key.
+ let bucket_idx = self.bucket_index(target_key).unwrap_or(0);
+
+ indexes.push(bucket_idx);
+
+ // Add additional bucket indexes within a certain distance limit.
+ for i in 1..DISTANCE_LIMIT {
+ if bucket_idx >= i && bucket_idx - i >= 1 {
+ indexes.push(bucket_idx - i);
+ }
+
+ if bucket_idx + i < (TABLE_SIZE - 1) {
+ indexes.push(bucket_idx + i);
+ }
+ }
+
+ indexes
+ }
+
+ /// Returns a list of the closest entries to the given target key, limited by max_entries.
+ pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
+ let mut entries: Vec<Entry> = vec![];
+
+ // Collect entries
+ 'outer: for idx in self.bucket_indexes(target_key) {
+ let bucket = &self.buckets[idx];
+ for bucket_entry in bucket.iter() {
+ if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
+ continue;
+ }
+
+ entries.push(bucket_entry.entry.clone());
+ if entries.len() == max_entries {
+ break 'outer;
+ }
+ }
+ }
+
+ // Sort the entries by their distance to the target key.
+ entries.sort_by(|a, b| {
+ xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key))
+ });
+
+ entries
+ }
+
+ /// Removes an entry with the given key from the routing table, if it exists.
+ pub fn remove_entry(&mut self, key: &Key) {
+ // Determine the bucket index for the given key.
+ let bucket_idx = match self.bucket_index(key) {
+ Some(bi) => bi,
+ None => return,
+ };
+
+ let bucket = &mut self.buckets[bucket_idx];
+ bucket.remove(key);
+ }
+
+ /// Returns an iterator of entries.
+ pub fn iter(&self) -> impl Iterator<Item = &Bucket> {
+ self.buckets.iter()
+ }
+
+ /// Returns a random entry from the routing table.
+ pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> {
+ for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) {
+ for entry in bucket.random_iter(bucket.len()) {
+ if entry.status & entry_flag == 0 {
+ continue;
+ }
+ return Some(&entry.entry);
+ }
+ }
+
+ None
+ }
+
+ // Returns the bucket index for a given key in the table.
+ fn bucket_index(&self, key: &Key) -> Option<usize> {
+ // Calculate the XOR distance between the self key and the provided key.
+ let distance = xor_distance(&self.key, key);
+
+ for (i, b) in distance.iter().enumerate() {
+ if *b != 0 {
+ let lz = i * 8 + b.leading_zeros() as usize;
+ let bits = KEY_SIZE * 8 - 1;
+ let idx = (bits - lz) / 8;
+ return Some(idx);
+ }
+ }
+ None
+ }
+
+ /// This function iterate through the routing table and counts how many
+ /// entries in the same subnet as the given Entry are already present.
+ ///
+ /// If the number of matching entries in the same bucket exceeds a
+ /// threshold (MAX_MATCHED_SUBNET_IN_BUCKET), or if the total count of
+ /// matching entries in the entire table exceeds a threshold
+ /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry
+ /// is considered restricted and returns true.
+ fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool {
+ let mut bucket_count = 0;
+ let mut table_count = 0;
+
+ // Iterate through the routing table's buckets and entries to check
+ // for subnet matches.
+ for (i, bucket) in self.buckets.iter().enumerate() {
+ for e in bucket.iter() {
+ // If there is a subnet match, update the counts.
+ let matched = subnet_match(&e.entry.addr, &entry.addr);
+ if matched {
+ if i == idx {
+ bucket_count += 1;
+ }
+ table_count += 1;
+ }
+
+ // If the number of matched entries in the same bucket exceeds
+ // the limit, return true
+ if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET {
+ return true;
+ }
+ }
+
+ // If the total matched entries in the table exceed the limit,
+ // return true.
+ if table_count >= MAX_MATCHED_SUBNET_IN_TABLE {
+ return true;
+ }
+ }
+
+ // If no subnet restrictions are encountered, return false.
+ false
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::bucket::ALL_ENTRY;
+ use super::*;
+
+ use karyons_net::Addr;
+
+ struct Setup {
+ local_key: Key,
+ keys: Vec<Key>,
+ }
+
+ fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry {
+ Entry {
+ key: key.clone(),
+ addr: addr.clone(),
+ port,
+ discovery_port,
+ }
+ }
+
+ impl Setup {
+ fn new() -> Self {
+ let keys = vec![
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 1,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 1, 1, 0, 1, 1, 2,
+ ],
+ [
+ 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 3,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0,
+ 0, 0, 0, 0, 0, 4,
+ ],
+ [
+ 223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 5,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0,
+ 0, 0, 0, 0, 0, 6,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0,
+ 0, 0, 0, 0, 0, 0, 7,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0,
+ 0, 0, 0, 0, 0, 0, 8,
+ ],
+ [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0,
+ 0, 0, 0, 0, 0, 0, 9,
+ ],
+ ];
+
+ Self {
+ local_key: [
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0,
+ ],
+ keys,
+ }
+ }
+
+ fn entries(&self) -> Vec<Entry> {
+ let mut entries = vec![];
+ for (i, key) in self.keys.iter().enumerate() {
+ entries.push(new_entry(
+ key,
+ &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
+ 3000,
+ 3010,
+ ));
+ }
+ entries
+ }
+
+ fn table(&self) -> RoutingTable {
+ let mut table = RoutingTable::new(self.local_key.clone());
+
+ for entry in self.entries() {
+ let res = table.add_entry(entry);
+ assert!(matches!(res, AddEntryResult::Added));
+ }
+
+ table
+ }
+ }
+
+ #[test]
+ fn test_bucket_index() {
+ let setup = Setup::new();
+ let table = setup.table();
+
+ assert_eq!(table.bucket_index(&setup.local_key), None);
+ assert_eq!(table.bucket_index(&setup.keys[0]), Some(0));
+ assert_eq!(table.bucket_index(&setup.keys[1]), Some(5));
+ assert_eq!(table.bucket_index(&setup.keys[2]), Some(26));
+ assert_eq!(table.bucket_index(&setup.keys[3]), Some(11));
+ assert_eq!(table.bucket_index(&setup.keys[4]), Some(31));
+ assert_eq!(table.bucket_index(&setup.keys[5]), Some(11));
+ assert_eq!(table.bucket_index(&setup.keys[6]), Some(12));
+ assert_eq!(table.bucket_index(&setup.keys[7]), Some(13));
+ assert_eq!(table.bucket_index(&setup.keys[8]), Some(14));
+ }
+
+ #[test]
+ fn test_closest_entries() {
+ let setup = Setup::new();
+ let table = setup.table();
+ let entries = setup.entries();
+
+ assert_eq!(
+ table.closest_entries(&setup.keys[5], 8),
+ vec![
+ entries[5].clone(),
+ entries[3].clone(),
+ entries[1].clone(),
+ entries[6].clone(),
+ entries[7].clone(),
+ entries[8].clone(),
+ entries[2].clone(),
+ ]
+ );
+
+ assert_eq!(
+ table.closest_entries(&setup.keys[4], 2),
+ vec![entries[4].clone(), entries[2].clone()]
+ );
+ }
+
+ #[test]
+ fn test_random_entry() {
+ let setup = Setup::new();
+ let mut table = setup.table();
+ let entries = setup.entries();
+
+ let entry = table.random_entry(ALL_ENTRY);
+ assert!(matches!(entry, Some(&_)));
+
+ let entry = table.random_entry(CONNECTED_ENTRY);
+ assert!(matches!(entry, None));
+
+ for entry in entries {
+ table.remove_entry(&entry.key);
+ }
+
+ let entry = table.random_entry(ALL_ENTRY);
+ assert!(matches!(entry, None));
+ }
+
+ #[test]
+ fn test_add_entries() {
+ let setup = Setup::new();
+ let mut table = setup.table();
+
+ let key = [
+ 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 5,
+ ];
+
+ let key2 = [
+ 0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 5,
+ ];
+
+ let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010);
+ assert!(matches!(
+ table.add_entry(entry1.clone()),
+ AddEntryResult::Added
+ ));
+
+ assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists));
+
+ let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010);
+ assert!(matches!(
+ table.add_entry(entry2),
+ AddEntryResult::Restricted
+ ));
+
+ let mut key: [u8; 32] = [0; 32];
+
+ for i in 0..BUCKET_SIZE {
+ key[i] += 1;
+ let entry = new_entry(
+ &key,
+ &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
+ 3000,
+ 3010,
+ );
+ table.add_entry(entry);
+ }
+
+ key[BUCKET_SIZE] += 1;
+ let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010);
+ assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored));
+ }
+}