aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/routing_table/mod.rs
blob: cfc3128ace14ac8117c4ee5064ff2cde9e84c218 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
use std::net::IpAddr;

mod bucket;
mod entry;

pub use bucket::{
    Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
    PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
};
pub use entry::{xor_distance, Entry, Key};

use rand::{rngs::OsRng, seq::SliceRandom};

use karyons_net::Addr;

use bucket::BUCKET_SIZE;
use entry::KEY_SIZE;

/// The total number of buckets in the routing table.
const TABLE_SIZE: usize = 32;

/// The distance limit for the closest buckets.
const DISTANCE_LIMIT: usize = 32;

/// The maximum number of matched subnets allowed within a single bucket.
const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1;

/// The maximum number of matched subnets across the entire routing table.
const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6;

/// Represents the possible result when adding a new entry.
#[derive(Debug)]
pub enum AddEntryResult {
    /// The entry is added.
    Added,
    /// The entry is already exists.
    Exists,
    /// The entry is ignored.
    Ignored,
    /// The entry is restricted and not allowed.
    Restricted,
}

/// This is a modified version of the Kademlia Distributed Hash Table (DHT).
/// <https://en.wikipedia.org/wiki/Kademlia>
#[derive(Debug)]
pub struct RoutingTable {
    key: Key,
    buckets: Vec<Bucket>,
}

impl RoutingTable {
    /// Creates a new RoutingTable
    pub fn new(key: Key) -> Self {
        let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
        Self { key, buckets }
    }

    /// Adds a new entry to the table and returns a result indicating success,
    /// failure, or restrictions.
    pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult {
        // Determine the index of the bucket where the entry should be placed.
        let bucket_idx = match self.bucket_index(&entry.key) {
            Some(i) => i,
            None => return AddEntryResult::Ignored,
        };

        let bucket = &self.buckets[bucket_idx];

        // Check if the entry already exists in the bucket.
        if bucket.contains_key(&entry.key) {
            return AddEntryResult::Exists;
        }

        // Check if the entry is restricted.
        if self.subnet_restricted(bucket_idx, &entry) {
            return AddEntryResult::Restricted;
        }

        let bucket = &mut self.buckets[bucket_idx];

        // If the bucket has free space, add the entry and return success.
        if bucket.len() < BUCKET_SIZE {
            bucket.add(&entry);
            return AddEntryResult::Added;
        }

        // Replace it with an incompatible entry if one exists.
        let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
        if let Some(e) = incompatible_entry {
            bucket.remove(&e.entry.key);
            bucket.add(&entry);
            return AddEntryResult::Added;
        }

        // If the bucket is full, the entry is ignored.
        AddEntryResult::Ignored
    }

    /// Check if the table contains the given key.
    pub fn contains_key(&self, key: &Key) -> bool {
        // Determine the bucket index for the given key.
        let bucket_idx = match self.bucket_index(key) {
            Some(bi) => bi,
            None => return false,
        };

        let bucket = &self.buckets[bucket_idx];
        bucket.contains_key(key)
    }

    /// Updates the status of an entry in the routing table identified
    /// by the given key.
    ///
    /// If the key is not found, no action is taken.
    pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) {
        // Determine the bucket index for the given key.
        let bucket_idx = match self.bucket_index(key) {
            Some(bi) => bi,
            None => return,
        };

        let bucket = &mut self.buckets[bucket_idx];
        bucket.update_entry(key, entry_flag);
    }

    /// Returns a list of bucket indexes that are closest to the given target key.
    pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> {
        let mut indexes = vec![];

        // Determine the primary bucket index for the target key.
        let bucket_idx = self.bucket_index(target_key).unwrap_or(0);

        indexes.push(bucket_idx);

        // Add additional bucket indexes within a certain distance limit.
        for i in 1..DISTANCE_LIMIT {
            if bucket_idx >= i && bucket_idx - i >= 1 {
                indexes.push(bucket_idx - i);
            }

            if bucket_idx + i < (TABLE_SIZE - 1) {
                indexes.push(bucket_idx + i);
            }
        }

        indexes
    }

    /// Returns a list of the closest entries to the given target key, limited by max_entries.
    pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
        let mut entries: Vec<Entry> = vec![];

        // Collect entries
        'outer: for idx in self.bucket_indexes(target_key) {
            let bucket = &self.buckets[idx];
            for bucket_entry in bucket.iter() {
                if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
                    continue;
                }

                entries.push(bucket_entry.entry.clone());
                if entries.len() == max_entries {
                    break 'outer;
                }
            }
        }

        // Sort the entries by their distance to the target key.
        entries.sort_by(|a, b| {
            xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key))
        });

        entries
    }

    /// Removes an entry with the given key from the routing table, if it exists.
    pub fn remove_entry(&mut self, key: &Key) {
        // Determine the bucket index for the given key.
        let bucket_idx = match self.bucket_index(key) {
            Some(bi) => bi,
            None => return,
        };

        let bucket = &mut self.buckets[bucket_idx];
        bucket.remove(key);
    }

    /// Returns an iterator of entries.
    pub fn iter(&self) -> impl Iterator<Item = &Bucket> {
        self.buckets.iter()
    }

    /// Returns a random entry from the routing table.
    pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> {
        for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) {
            for entry in bucket.random_iter(bucket.len()) {
                if entry.status & entry_flag == 0 {
                    continue;
                }
                return Some(&entry.entry);
            }
        }

        None
    }

    // Returns the bucket index for a given key in the table.
    fn bucket_index(&self, key: &Key) -> Option<usize> {
        // Calculate the XOR distance between the self key and the provided key.
        let distance = xor_distance(&self.key, key);

        for (i, b) in distance.iter().enumerate() {
            if *b != 0 {
                let lz = i * 8 + b.leading_zeros() as usize;
                let bits = KEY_SIZE * 8 - 1;
                let idx = (bits - lz) / 8;
                return Some(idx);
            }
        }
        None
    }

    /// This function iterate through the routing table and counts how many
    /// entries in the same subnet as the given Entry are already present.
    ///
    /// If the number of matching entries in the same bucket exceeds a
    /// threshold (MAX_MATCHED_SUBNET_IN_BUCKET), or if the total count of
    /// matching entries in the entire table exceeds a threshold
    /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry
    /// is considered restricted and returns true.
    fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool {
        let mut bucket_count = 0;
        let mut table_count = 0;

        // Iterate through the routing table's buckets and entries to check
        // for subnet matches.
        for (i, bucket) in self.buckets.iter().enumerate() {
            for e in bucket.iter() {
                // If there is a subnet match, update the counts.
                let matched = subnet_match(&e.entry.addr, &entry.addr);
                if matched {
                    if i == idx {
                        bucket_count += 1;
                    }
                    table_count += 1;
                }

                // If the number of matched entries in the same bucket exceeds
                // the limit, return true
                if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET {
                    return true;
                }
            }

            // If the total matched entries in the table exceed the limit,
            // return true.
            if table_count >= MAX_MATCHED_SUBNET_IN_TABLE {
                return true;
            }
        }

        // If no subnet restrictions are encountered, return false.
        false
    }
}

/// Check if two addresses belong to the same subnet.
pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool {
    match (addr, other_addr) {
        (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => {
            // TODO: Consider moving this to a different place
            if other_ip.is_loopback() && ip.is_loopback() {
                return false;
            }
            ip.octets()[0..3] == other_ip.octets()[0..3]
        }
        _ => false,
    }
}

#[cfg(test)]
mod tests {
    use super::bucket::ALL_ENTRY;
    use super::*;

    use karyons_net::Addr;

    struct Setup {
        local_key: Key,
        keys: Vec<Key>,
    }

    fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry {
        Entry {
            key: key.clone(),
            addr: addr.clone(),
            port,
            discovery_port,
        }
    }

    impl Setup {
        fn new() -> Self {
            let keys = vec![
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
                    0, 0, 0, 0, 0, 1,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
                    1, 1, 0, 1, 1, 2,
                ],
                [
                    0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
                    0, 0, 0, 0, 0, 3,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0,
                    0, 0, 0, 0, 0, 4,
                ],
                [
                    223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
                    0, 0, 0, 0, 0, 5,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0,
                    0, 0, 0, 0, 0, 6,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0,
                    0, 0, 0, 0, 0, 0, 7,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0,
                    0, 0, 0, 0, 0, 0, 8,
                ],
                [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0,
                    0, 0, 0, 0, 0, 0, 9,
                ],
            ];

            Self {
                local_key: [
                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
                    0, 0, 0, 0, 0, 0,
                ],
                keys,
            }
        }

        fn entries(&self) -> Vec<Entry> {
            let mut entries = vec![];
            for (i, key) in self.keys.iter().enumerate() {
                entries.push(new_entry(
                    key,
                    &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
                    3000,
                    3010,
                ));
            }
            entries
        }

        fn table(&self) -> RoutingTable {
            let mut table = RoutingTable::new(self.local_key.clone());

            for entry in self.entries() {
                let res = table.add_entry(entry);
                assert!(matches!(res, AddEntryResult::Added));
            }

            table
        }
    }

    #[test]
    fn test_bucket_index() {
        let setup = Setup::new();
        let table = setup.table();

        assert_eq!(table.bucket_index(&setup.local_key), None);
        assert_eq!(table.bucket_index(&setup.keys[0]), Some(0));
        assert_eq!(table.bucket_index(&setup.keys[1]), Some(5));
        assert_eq!(table.bucket_index(&setup.keys[2]), Some(26));
        assert_eq!(table.bucket_index(&setup.keys[3]), Some(11));
        assert_eq!(table.bucket_index(&setup.keys[4]), Some(31));
        assert_eq!(table.bucket_index(&setup.keys[5]), Some(11));
        assert_eq!(table.bucket_index(&setup.keys[6]), Some(12));
        assert_eq!(table.bucket_index(&setup.keys[7]), Some(13));
        assert_eq!(table.bucket_index(&setup.keys[8]), Some(14));
    }

    #[test]
    fn test_closest_entries() {
        let setup = Setup::new();
        let table = setup.table();
        let entries = setup.entries();

        assert_eq!(
            table.closest_entries(&setup.keys[5], 8),
            vec![
                entries[5].clone(),
                entries[3].clone(),
                entries[1].clone(),
                entries[6].clone(),
                entries[7].clone(),
                entries[8].clone(),
                entries[2].clone(),
            ]
        );

        assert_eq!(
            table.closest_entries(&setup.keys[4], 2),
            vec![entries[4].clone(), entries[2].clone()]
        );
    }

    #[test]
    fn test_random_entry() {
        let setup = Setup::new();
        let mut table = setup.table();
        let entries = setup.entries();

        let entry = table.random_entry(ALL_ENTRY);
        assert!(matches!(entry, Some(&_)));

        let entry = table.random_entry(CONNECTED_ENTRY);
        assert!(matches!(entry, None));

        for entry in entries {
            table.remove_entry(&entry.key);
        }

        let entry = table.random_entry(ALL_ENTRY);
        assert!(matches!(entry, None));
    }

    #[test]
    fn test_add_entries() {
        let setup = Setup::new();
        let mut table = setup.table();

        let key = [
            0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 5,
        ];

        let key2 = [
            0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 5,
        ];

        let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010);
        assert!(matches!(
            table.add_entry(entry1.clone()),
            AddEntryResult::Added
        ));

        assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists));

        let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010);
        assert!(matches!(
            table.add_entry(entry2),
            AddEntryResult::Restricted
        ));

        let mut key: [u8; 32] = [0; 32];

        for i in 0..BUCKET_SIZE {
            key[i] += 1;
            let entry = new_entry(
                &key,
                &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
                3000,
                3010,
            );
            table.add_entry(entry);
        }

        key[BUCKET_SIZE] += 1;
        let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010);
        assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored));
    }
}