diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-05-23 15:43:04 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-05-23 15:43:04 +0200 | 
| commit | 998568ab76cc8ba36fe47d5fca17bcc997aa391c (patch) | |
| tree | eb6e1239498f8d7179eb2e3e0b74c6396bcea51b | |
| parent | c9785e8cc5b6a9a722ba0aff1eb33c2dbf020f2e (diff) | |
p2p: wrap the buckets with mutex in RoutingTable
| -rw-r--r-- | Cargo.lock | 2 | ||||
| -rw-r--r-- | p2p/Cargo.toml | 1 | ||||
| -rw-r--r-- | p2p/examples/monitor.rs | 2 | ||||
| -rw-r--r-- | p2p/src/discovery/lookup.rs | 28 | ||||
| -rw-r--r-- | p2p/src/discovery/mod.rs | 53 | ||||
| -rw-r--r-- | p2p/src/discovery/refresh.rs | 23 | ||||
| -rw-r--r-- | p2p/src/routing_table/mod.rs | 69 | 
7 files changed, 75 insertions, 103 deletions
| @@ -1241,6 +1241,7 @@ dependencies = [   "ed25519-dalek",   "log",   "once_cell", + "parking_lot",   "pin-project-lite",   "rand",   "smol", @@ -1322,6 +1323,7 @@ dependencies = [   "karyon_jsonrpc",   "karyon_net",   "log", + "parking_lot",   "rand",   "rcgen 0.12.1",   "rustls-pki-types", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 22236e3..fe46953 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -48,6 +48,7 @@ futures-rustls = { version = "0.25.1", features = [  ], optional = true }  tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true }  rustls-pki-types = "1.7.0" +parking_lot = "0.12.2"  [dev-dependencies]  async-std = "1.12.0" diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 1629207..cda8972 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,6 +1,6 @@  mod shared; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc;  use clap::Parser;  use log::error; diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 4a06083..3beea7e 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,10 +5,7 @@ use log::{error, trace};  use rand::{rngs::OsRng, seq::SliceRandom, RngCore};  use karyon_core::{ -    async_runtime::{ -        lock::{Mutex, RwLock}, -        Executor, -    }, +    async_runtime::{lock::RwLock, Executor},      async_util::timeout,      crypto::KeyPair,      util::decode, @@ -38,7 +35,7 @@ pub struct LookupService {      id: PeerID,      /// Routing Table -    table: Arc<Mutex<RoutingTable>>, +    table: Arc<RoutingTable>,      /// Listener      listener: Arc<Listener>, @@ -63,7 +60,7 @@ impl LookupService {      pub fn new(          key_pair: &KeyPair,          id: &PeerID, -        table: Arc<Mutex<RoutingTable>>, +        table: Arc<RoutingTable>,          config: Arc<Config>,          monitor: Arc<Monitor>,          ex: Executor, @@ -158,12 +155,10 @@ impl LookupService {              }          } -        let mut table = self.table.lock().await;          for peer in peer_buffer.iter() { -            let result = table.add_entry(peer.clone().into()); +            let result = self.table.add_entry(peer.clone().into());              trace!("Add entry {:?}", result);          } -        drop(table);          self.monitor              .notify(DiscoveryEvent::LookupSucceeded( @@ -190,11 +185,10 @@ impl LookupService {                  .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)                  .await?; -            let table = self.table.lock().await;              for peer in peers {                  if random_peers.contains(&peer)                      || peer.peer_id == self.id -                    || table.contains_key(&peer.peer_id.0) +                    || self.table.contains_key(&peer.peer_id.0)                  {                      continue;                  } @@ -233,10 +227,6 @@ impl LookupService {          target_peer_id: &PeerID,      ) -> Result<Vec<PeerMsg>> {          let conn = self.connector.connect(&endpoint, &peer_id).await?; -        self.monitor -            .notify(ConnEvent::Connected(endpoint.clone())) -            .await; -          let result = self.handle_outbound(conn, target_peer_id).await;          self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; @@ -318,7 +308,7 @@ impl LookupService {                  }                  NetMsgCmd::Peer => {                      let (peer, _) = decode::<PeerMsg>(&msg.payload)?; -                    let result = self.table.lock().await.add_entry(peer.clone().into()); +                    let result = self.table.add_entry(peer.clone().into());                      trace!("Add entry result: {:?}", result);                  }                  c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), @@ -381,9 +371,9 @@ impl LookupService {      /// Sends a Peers msg.      async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> {          trace!("Send Peers msg"); -        let table = self.table.lock().await; -        let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); -        drop(table); +        let entries = self +            .table +            .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);          let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();          conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 19ae77a..64e5c14 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -7,7 +7,7 @@ use log::{error, info};  use rand::{rngs::OsRng, seq::SliceRandom};  use karyon_core::{ -    async_runtime::{lock::Mutex, Executor}, +    async_runtime::Executor,      async_util::{Backoff, TaskGroup, TaskResult},      crypto::KeyPair,  }; @@ -22,8 +22,8 @@ use crate::{      message::NetMsg,      monitor::Monitor,      routing_table::{ -        Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, -        INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, +        RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, +        UNREACHABLE_ENTRY, UNSTABLE_ENTRY,      },      slots::ConnectionSlots,      Error, PeerID, Result, @@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc<Discovery>;  pub struct Discovery {      /// Routing table -    table: Arc<Mutex<RoutingTable>>, +    table: Arc<RoutingTable>,      /// Lookup Service      lookup_service: Arc<LookupService>, @@ -80,7 +80,7 @@ impl Discovery {          let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));          let table_key = peer_id.0; -        let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); +        let table = Arc::new(RoutingTable::new(table_key));          let refresh_service =              RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); @@ -201,8 +201,7 @@ impl Discovery {      async fn connect_loop(self: Arc<Self>) -> Result<()> {          let backoff = Backoff::new(500, self.config.seeding_interval * 1000);          loop { -            let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; -            match random_table_entry { +            match self.table.random_entry(PENDING_ENTRY) {                  Some(entry) => {                      backoff.reset();                      let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -233,18 +232,13 @@ impl Discovery {              match result {                  Err(Error::IncompatiblePeer) => {                      error!("Failed to do handshake: {endpoint_c} incompatible peer"); -                    selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; +                    selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);                  }                  Err(Error::PeerAlreadyConnected) => { -                    // TODO: Use an appropriate status. -                    selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; -                } -                Err(_) => { -                    selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; -                } -                Ok(_) => { -                    selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; +                    selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)                  } +                Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), +                Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),              }              Ok(()) @@ -257,12 +251,8 @@ impl Discovery {          if let Some(pid) = &pid {              match result { -                Ok(_) => { -                    self.update_table_entry(pid, CONNECTED_ENTRY).await; -                } -                Err(_) => { -                    self.update_table_entry(pid, UNREACHABLE_ENTRY).await; -                } +                Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY), +                Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),              }          }      } @@ -274,16 +264,12 @@ impl Discovery {      /// table doesn't have an available entry, it will connect to one of the      /// provided bootstrap endpoints in the `Config` and initiate the lookup.      async fn start_seeding(&self) { -        match self -            .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) -            .await -        { +        match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {              Some(entry) => {                  let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);                  let peer_id = Some(entry.key.into());                  if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { -                    self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) -                        .await; +                    self.table.update_entry(&entry.key, UNSTABLE_ENTRY);                      error!("Failed to do lookup: {endpoint}: {err}");                  }              } @@ -297,15 +283,4 @@ impl Discovery {              }          }      } - -    /// Returns a random entry from routing table. -    async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { -        self.table.lock().await.random_entry(entry_flag).cloned() -    } - -    /// Update the entry status   -    async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { -        let table = &mut self.table.lock().await; -        table.update_entry(&pid.0, entry_flag); -    }  } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 745a5d5..eec6743 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -5,10 +5,7 @@ use log::{error, info, trace};  use rand::{rngs::OsRng, RngCore};  use karyon_core::{ -    async_runtime::{ -        lock::{Mutex, RwLock}, -        Executor, -    }, +    async_runtime::{lock::RwLock, Executor},      async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},  }; @@ -33,7 +30,7 @@ pub struct PongMsg(pub [u8; 32]);  pub struct RefreshService {      /// Routing table -    table: Arc<Mutex<RoutingTable>>, +    table: Arc<RoutingTable>,      /// Resolved listen endpoint      listen_endpoint: Option<RwLock<Endpoint>>, @@ -55,7 +52,7 @@ impl RefreshService {      /// Creates a new refresh service      pub fn new(          config: Arc<Config>, -        table: Arc<Mutex<RoutingTable>>, +        table: Arc<RoutingTable>,          monitor: Arc<Monitor>,          executor: Executor,      ) -> Self { @@ -122,7 +119,7 @@ impl RefreshService {              self.monitor.notify(DiscoveryEvent::RefreshStarted).await;              let mut entries: Vec<BucketEntry> = vec![]; -            for bucket in self.table.lock().await.iter() { +            for bucket in self.table.buckets() {                  for entry in bucket                      .iter()                      .filter(|e| !e.is_connected() && !e.is_incompatible()) @@ -145,10 +142,7 @@ impl RefreshService {              let mut tasks = Vec::new();              for bucket_entry in chunk {                  if bucket_entry.failures >= MAX_FAILURES { -                    self.table -                        .lock() -                        .await -                        .remove_entry(&bucket_entry.entry.key); +                    self.table.remove_entry(&bucket_entry.entry.key);                      continue;                  } @@ -167,16 +161,15 @@ impl RefreshService {          let key = &bucket_entry.entry.key;          match self.connect(&bucket_entry.entry).await {              Ok(_) => { -                self.table.lock().await.update_entry(key, PENDING_ENTRY); +                self.table.update_entry(key, PENDING_ENTRY);              }              Err(err) => {                  trace!("Failed to refresh entry {:?}: {err}", key); -                let table = &mut self.table.lock().await;                  if bucket_entry.failures >= MAX_FAILURES { -                    table.remove_entry(key); +                    self.table.remove_entry(key);                      return;                  } -                table.update_entry(key, UNREACHABLE_ENTRY); +                self.table.update_entry(key, UNREACHABLE_ENTRY);              }          }      } diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index bbf4801..49ddac5 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -1,18 +1,20 @@ -use std::net::IpAddr; -  mod bucket;  mod entry; +use std::net::IpAddr; + +use parking_lot::RwLock; + +use rand::{rngs::OsRng, seq::SliceRandom}; + +use karyon_net::Addr; +  pub use bucket::{      Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,      PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,  };  pub use entry::{xor_distance, Entry, Key}; -use rand::{rngs::OsRng, seq::SliceRandom}; - -use karyon_net::Addr; -  use bucket::BUCKET_SIZE;  use entry::KEY_SIZE; @@ -46,29 +48,30 @@ pub enum AddEntryResult {  #[derive(Debug)]  pub struct RoutingTable {      key: Key, -    buckets: Vec<Bucket>, +    buckets: RwLock<Vec<Bucket>>,  }  impl RoutingTable {      /// Creates a new RoutingTable      pub fn new(key: Key) -> Self {          let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect(); -        Self { key, buckets } +        Self { +            key, +            buckets: RwLock::new(buckets), +        }      }      /// Adds a new entry to the table and returns a result indicating success,      /// failure, or restrictions. -    pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult { +    pub fn add_entry(&self, entry: Entry) -> AddEntryResult {          // Determine the index of the bucket where the entry should be placed.          let bucket_idx = match self.bucket_index(&entry.key) {              Some(i) => i,              None => return AddEntryResult::Ignored,          }; -        let bucket = &self.buckets[bucket_idx]; -          // Check if the entry already exists in the bucket. -        if bucket.contains_key(&entry.key) { +        if self.contains_key(&entry.key) {              return AddEntryResult::Exists;          } @@ -77,7 +80,8 @@ impl RoutingTable {              return AddEntryResult::Restricted;          } -        let bucket = &mut self.buckets[bucket_idx]; +        let mut buckets = self.buckets.write(); +        let bucket = &mut buckets[bucket_idx];          // If the bucket has free space, add the entry and return success.          if bucket.len() < BUCKET_SIZE { @@ -99,13 +103,14 @@ impl RoutingTable {      /// Check if the table contains the given key.      pub fn contains_key(&self, key: &Key) -> bool { +        let buckets = self.buckets.read();          // Determine the bucket index for the given key.          let bucket_idx = match self.bucket_index(key) {              Some(bi) => bi,              None => return false,          }; -        let bucket = &self.buckets[bucket_idx]; +        let bucket = &buckets[bucket_idx];          bucket.contains_key(key)      } @@ -113,14 +118,15 @@ impl RoutingTable {      /// by the given key.      ///      /// If the key is not found, no action is taken. -    pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { +    pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) { +        let mut buckets = self.buckets.write();          // Determine the bucket index for the given key.          let bucket_idx = match self.bucket_index(key) {              Some(bi) => bi,              None => return,          }; -        let bucket = &mut self.buckets[bucket_idx]; +        let bucket = &mut buckets[bucket_idx];          bucket.update_entry(key, entry_flag);      } @@ -149,11 +155,12 @@ impl RoutingTable {      /// Returns a list of the closest entries to the given target key, limited by max_entries.      pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> { +        let buckets = self.buckets.read();          let mut entries: Vec<Entry> = vec![];          // Collect entries          'outer: for idx in self.bucket_indexes(target_key) { -            let bucket = &self.buckets[idx]; +            let bucket = &buckets[idx];              for bucket_entry in bucket.iter() {                  if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {                      continue; @@ -175,30 +182,33 @@ impl RoutingTable {      }      /// Removes an entry with the given key from the routing table, if it exists. -    pub fn remove_entry(&mut self, key: &Key) { +    pub fn remove_entry(&self, key: &Key) { +        let mut buckets = self.buckets.write();          // Determine the bucket index for the given key.          let bucket_idx = match self.bucket_index(key) {              Some(bi) => bi,              None => return,          }; -        let bucket = &mut self.buckets[bucket_idx]; +        let bucket = &mut buckets[bucket_idx];          bucket.remove(key);      }      /// Returns an iterator of entries. -    pub fn iter(&self) -> impl Iterator<Item = &Bucket> { -        self.buckets.iter() +    /// FIXME: TODO: avoid cloning the data +    pub fn buckets(&self) -> Vec<Bucket> { +        self.buckets.read().clone()      }      /// Returns a random entry from the routing table. -    pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> { -        for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) { +    pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { +        let buckets = self.buckets.read(); +        for bucket in buckets.choose_multiple(&mut OsRng, buckets.len()) {              for entry in bucket.random_iter(bucket.len()) {                  if entry.status & entry_flag == 0 {                      continue;                  } -                return Some(&entry.entry); +                return Some(entry.entry.clone());              }          } @@ -230,12 +240,13 @@ impl RoutingTable {      /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry      /// is considered restricted and returns true.      fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool { +        let buckets = self.buckets.read();          let mut bucket_count = 0;          let mut table_count = 0;          // Iterate through the routing table's buckets and entries to check          // for subnet matches. -        for (i, bucket) in self.buckets.iter().enumerate() { +        for (i, bucket) in buckets.iter().enumerate() {              for e in bucket.iter() {                  // If there is a subnet match, update the counts.                  let matched = subnet_match(&e.entry.addr, &entry.addr); @@ -366,7 +377,7 @@ mod tests {          }          fn table(&self) -> RoutingTable { -            let mut table = RoutingTable::new(self.local_key.clone()); +            let table = RoutingTable::new(self.local_key.clone());              for entry in self.entries() {                  let res = table.add_entry(entry); @@ -422,11 +433,11 @@ mod tests {      #[test]      fn test_random_entry() {          let setup = Setup::new(); -        let mut table = setup.table(); +        let table = setup.table();          let entries = setup.entries();          let entry = table.random_entry(ALL_ENTRY); -        assert!(matches!(entry, Some(&_))); +        assert!(matches!(entry, Some(_)));          let entry = table.random_entry(CONNECTED_ENTRY);          assert!(matches!(entry, None)); @@ -442,7 +453,7 @@ mod tests {      #[test]      fn test_add_entries() {          let setup = Setup::new(); -        let mut table = setup.table(); +        let table = setup.table();          let key = [              0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | 
