From 998568ab76cc8ba36fe47d5fca17bcc997aa391c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 23 May 2024 15:43:04 +0200 Subject: p2p: wrap the buckets with mutex in RoutingTable --- p2p/src/discovery/lookup.rs | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) (limited to 'p2p/src/discovery/lookup.rs') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 4a06083..3beea7e 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,10 +5,7 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::timeout, crypto::KeyPair, util::decode, @@ -38,7 +35,7 @@ pub struct LookupService { id: PeerID, /// Routing Table - table: Arc>, + table: Arc, /// Listener listener: Arc, @@ -63,7 +60,7 @@ impl LookupService { pub fn new( key_pair: &KeyPair, id: &PeerID, - table: Arc>, + table: Arc, config: Arc, monitor: Arc, ex: Executor, @@ -158,12 +155,10 @@ impl LookupService { } } - let mut table = self.table.lock().await; for peer in peer_buffer.iter() { - let result = table.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry {:?}", result); } - drop(table); self.monitor .notify(DiscoveryEvent::LookupSucceeded( @@ -190,11 +185,10 @@ impl LookupService { .connect(endpoint.clone(), peer_id.clone(), &random_peer_id) .await?; - let table = self.table.lock().await; for peer in peers { if random_peers.contains(&peer) || peer.peer_id == self.id - || table.contains_key(&peer.peer_id.0) + || self.table.contains_key(&peer.peer_id.0) { continue; } @@ -233,10 +227,6 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result> { let conn = self.connector.connect(&endpoint, &peer_id).await?; - self.monitor - .notify(ConnEvent::Connected(endpoint.clone())) - .await; - let result = self.handle_outbound(conn, target_peer_id).await; self.monitor.notify(ConnEvent::Disconnected(endpoint)).await; @@ -318,7 +308,7 @@ impl LookupService { } NetMsgCmd::Peer => { let (peer, _) = decode::(&msg.payload)?; - let result = self.table.lock().await.add_entry(peer.clone().into()); + let result = self.table.add_entry(peer.clone().into()); trace!("Add entry result: {:?}", result); } c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), @@ -381,9 +371,9 @@ impl LookupService { /// Sends a Peers msg. async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn) -> Result<()> { trace!("Send Peers msg"); - let table = self.table.lock().await; - let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); - drop(table); + let entries = self + .table + .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); let peers: Vec = entries.into_iter().map(|e| e.into()).collect(); conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) -- cgit v1.2.3