diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-05-23 15:43:04 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-05-23 15:43:04 +0200 | 
| commit | 998568ab76cc8ba36fe47d5fca17bcc997aa391c (patch) | |
| tree | eb6e1239498f8d7179eb2e3e0b74c6396bcea51b /p2p/src/discovery/mod.rs | |
| parent | c9785e8cc5b6a9a722ba0aff1eb33c2dbf020f2e (diff) | |
p2p: wrap the buckets with mutex in RoutingTable
Diffstat (limited to 'p2p/src/discovery/mod.rs')
| -rw-r--r-- | p2p/src/discovery/mod.rs | 53 | 
1 files changed, 14 insertions, 39 deletions
| diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 19ae77a..64e5c14 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -7,7 +7,7 @@ use log::{error, info};  use rand::{rngs::OsRng, seq::SliceRandom};  use karyon_core::{ -    async_runtime::{lock::Mutex, Executor}, +    async_runtime::Executor,      async_util::{Backoff, TaskGroup, TaskResult},      crypto::KeyPair,  }; @@ -22,8 +22,8 @@ use crate::{      message::NetMsg,      monitor::Monitor,      routing_table::{ -        Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, -        INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, +        RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, +        UNREACHABLE_ENTRY, UNSTABLE_ENTRY,      },      slots::ConnectionSlots,      Error, PeerID, Result, @@ -36,7 +36,7 @@ pub type ArcDiscovery = Arc<Discovery>;  pub struct Discovery {      /// Routing table -    table: Arc<Mutex<RoutingTable>>, +    table: Arc<RoutingTable>,      /// Lookup Service      lookup_service: Arc<LookupService>, @@ -80,7 +80,7 @@ impl Discovery {          let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));          let table_key = peer_id.0; -        let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); +        let table = Arc::new(RoutingTable::new(table_key));          let refresh_service =              RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); @@ -201,8 +201,7 @@ impl Discovery {      async fn connect_loop(self: Arc<Self>) -> Result<()> {          let backoff = Backoff::new(500, self.config.seeding_interval * 1000);          loop { -            let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; -            match random_table_entry { +            match self.table.random_entry(PENDING_ENTRY) {                  Some(entry) => {                      backoff.reset();                      let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -233,18 +232,13 @@ impl Discovery {              match result {                  Err(Error::IncompatiblePeer) => {                      error!("Failed to do handshake: {endpoint_c} incompatible peer"); -                    selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; +                    selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);                  }                  Err(Error::PeerAlreadyConnected) => { -                    // TODO: Use an appropriate status. -                    selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; -                } -                Err(_) => { -                    selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; -                } -                Ok(_) => { -                    selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; +                    selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)                  } +                Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), +                Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),              }              Ok(()) @@ -257,12 +251,8 @@ impl Discovery {          if let Some(pid) = &pid {              match result { -                Ok(_) => { -                    self.update_table_entry(pid, CONNECTED_ENTRY).await; -                } -                Err(_) => { -                    self.update_table_entry(pid, UNREACHABLE_ENTRY).await; -                } +                Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY), +                Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),              }          }      } @@ -274,16 +264,12 @@ impl Discovery {      /// table doesn't have an available entry, it will connect to one of the      /// provided bootstrap endpoints in the `Config` and initiate the lookup.      async fn start_seeding(&self) { -        match self -            .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) -            .await -        { +        match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {              Some(entry) => {                  let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);                  let peer_id = Some(entry.key.into());                  if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { -                    self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) -                        .await; +                    self.table.update_entry(&entry.key, UNSTABLE_ENTRY);                      error!("Failed to do lookup: {endpoint}: {err}");                  }              } @@ -297,15 +283,4 @@ impl Discovery {              }          }      } - -    /// Returns a random entry from routing table. -    async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { -        self.table.lock().await.random_entry(entry_flag).cloned() -    } - -    /// Update the entry status   -    async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { -        let table = &mut self.table.lock().await; -        table.update_entry(&pid.0, entry_flag); -    }  } | 
