aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/lookup.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-23 15:43:04 +0200
commit998568ab76cc8ba36fe47d5fca17bcc997aa391c (patch)
treeeb6e1239498f8d7179eb2e3e0b74c6396bcea51b /p2p/src/discovery/lookup.rs
parentc9785e8cc5b6a9a722ba0aff1eb33c2dbf020f2e (diff)
p2p: wrap the buckets with mutex in RoutingTable
Diffstat (limited to 'p2p/src/discovery/lookup.rs')
-rw-r--r--p2p/src/discovery/lookup.rs28
1 files changed, 9 insertions, 19 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 4a06083..3beea7e 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,10 +5,7 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::timeout,
crypto::KeyPair,
util::decode,
@@ -38,7 +35,7 @@ pub struct LookupService {
id: PeerID,
/// Routing Table
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
/// Listener
listener: Arc<Listener>,
@@ -63,7 +60,7 @@ impl LookupService {
pub fn new(
key_pair: &KeyPair,
id: &PeerID,
- table: Arc<Mutex<RoutingTable>>,
+ table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
ex: Executor,
@@ -158,12 +155,10 @@ impl LookupService {
}
}
- let mut table = self.table.lock().await;
for peer in peer_buffer.iter() {
- let result = table.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry {:?}", result);
}
- drop(table);
self.monitor
.notify(DiscoveryEvent::LookupSucceeded(
@@ -190,11 +185,10 @@ impl LookupService {
.connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
.await?;
- let table = self.table.lock().await;
for peer in peers {
if random_peers.contains(&peer)
|| peer.peer_id == self.id
- || table.contains_key(&peer.peer_id.0)
+ || self.table.contains_key(&peer.peer_id.0)
{
continue;
}
@@ -233,10 +227,6 @@ impl LookupService {
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
- self.monitor
- .notify(ConnEvent::Connected(endpoint.clone()))
- .await;
-
let result = self.handle_outbound(conn, target_peer_id).await;
self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
@@ -318,7 +308,7 @@ impl LookupService {
}
NetMsgCmd::Peer => {
let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
- let result = self.table.lock().await.add_entry(peer.clone().into());
+ let result = self.table.add_entry(peer.clone().into());
trace!("Add entry result: {:?}", result);
}
c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
@@ -381,9 +371,9 @@ impl LookupService {
/// Sends a Peers msg.
async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Peers msg");
- let table = self.table.lock().await;
- let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
- drop(table);
+ let entries = self
+ .table
+ .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?)