From f3bb85508335eab91fbd76d15e74dcc575195acf Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 19:26:26 +0200 Subject: p2p/backend: add methods to return inbound/outbound connected peers --- p2p/src/backend.rs | 15 +++++---- p2p/src/discovery/mod.rs | 35 +++++++++---------- p2p/src/peer_pool.rs | 87 +++++++++++++++++++++++++++++------------------- p2p/src/slots.rs | 5 --- 4 files changed, 77 insertions(+), 65 deletions(-) (limited to 'p2p/src') diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 16cc20b..f21d70b 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -1,8 +1,9 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use log::info; use karyon_core::{async_runtime::Executor, crypto::KeyPair}; +use karyon_net::Endpoint; use crate::{ config::Config, @@ -111,14 +112,14 @@ impl Backend { &self.key_pair } - /// Returns the number of occupied inbound slots. - pub fn inbound_slots(&self) -> usize { - self.discovery.inbound_slots.load() + /// Returns a map of inbound connected peers with their endpoints. + pub async fn inbound_peers(&self) -> HashMap { + self.peer_pool.inbound_peers().await } - /// Returns the number of occupied outbound slots. - pub fn outbound_slots(&self) -> usize { - self.discovery.outbound_slots.load() + /// Returns a map of outbound connected peers with their endpoints. + pub async fn outbound_peers(&self) -> HashMap { + self.peer_pool.outbound_peers().await } /// Returns the monitor to receive system events. diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 529469e..99f880d 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -53,12 +53,6 @@ pub struct Discovery { /// Connection queue conn_queue: Arc, - /// Inbound slots. - pub(crate) inbound_slots: Arc, - - /// Outbound slots. - pub(crate) outbound_slots: Arc, - /// Managing spawned tasks. task_group: TaskGroup, @@ -76,23 +70,25 @@ impl Discovery { monitor: Arc, ex: Executor, ) -> ArcDiscovery { - let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); - let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); + let table = Arc::new(RoutingTable::new(peer_id.0)); - let table_key = peer_id.0; - let table = Arc::new(RoutingTable::new(table_key)); + let refresh_service = Arc::new(RefreshService::new( + config.clone(), + table.clone(), + monitor.clone(), + ex.clone(), + )); - let refresh_service = - RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); - let lookup_service = LookupService::new( + let lookup_service = Arc::new(LookupService::new( key_pair, peer_id, table.clone(), config.clone(), monitor.clone(), ex.clone(), - ); + )); + let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); let connector = Connector::new( key_pair, config.max_connect_retries, @@ -102,6 +98,7 @@ impl Discovery { ex.clone(), ); + let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let listener = Listener::new( key_pair, inbound_slots.clone(), @@ -110,16 +107,16 @@ impl Discovery { ex.clone(), ); + let task_group = TaskGroup::with_executor(ex); + Arc::new(Self { - refresh_service: Arc::new(refresh_service), - lookup_service: Arc::new(lookup_service), + refresh_service, + lookup_service, conn_queue, table, - inbound_slots, - outbound_slots, connector, listener, - task_group: TaskGroup::with_executor(ex), + task_group, config, }) } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 6c895a0..f8dda66 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -9,15 +9,12 @@ use bincode::{Decode, Encode}; use log::{error, info, trace, warn}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::{timeout, TaskGroup, TaskResult}, util::decode, }; -use karyon_net::Conn; +use karyon_net::{Conn, Endpoint}; use crate::{ config::Config, @@ -42,7 +39,7 @@ pub struct PeerPool { conn_queue: Arc, /// Holds the running peers. - peers: Mutex>, + peers: RwLock>, /// Hashmap contains protocol constructors. pub(crate) protocols: RwLock>>, @@ -78,7 +75,7 @@ impl PeerPool { Arc::new(Self { id: id.clone(), conn_queue, - peers: Mutex::new(HashMap::new()), + peers: RwLock::new(HashMap::new()), protocols, protocol_versions, task_group: TaskGroup::with_executor(executor.clone()), @@ -96,26 +93,9 @@ impl PeerPool { Ok(()) } - /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc) { - loop { - let conn = self.conn_queue.next().await; - let signal = conn.disconnect_signal; - - let result = self - .new_peer(conn.conn, &conn.direction, signal.clone()) - .await; - - // Only send a disconnect signal if there is an error when adding a peer. - if result.is_err() { - let _ = signal.send(result).await; - } - } - } - /// Shuts down pub async fn shutdown(&self) { - for (_, peer) in self.peers.lock().await.iter() { + for (_, peer) in self.peers.read().await.iter() { peer.shutdown().await; } @@ -132,14 +112,9 @@ impl PeerPool { Ok(()) } - /// Returns the number of currently connected peers. - pub async fn peers_len(&self) -> usize { - self.peers.lock().await.len() - } - /// Broadcast a message to all connected peers using the specified protocol. pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { - for (pid, peer) in self.peers.lock().await.iter() { + for (pid, peer) in self.peers.read().await.iter() { if let Err(err) = peer.send(proto_id, msg).await { error!("failed to send msg to {pid}: {err}"); continue; @@ -175,7 +150,7 @@ impl PeerPool { ); // Insert the new peer - self.peers.lock().await.insert(pid.clone(), peer.clone()); + self.peers.write().await.insert(pid.clone(), peer.clone()); let selfc = self.clone(); let pid_c = pid.clone(); @@ -199,12 +174,56 @@ impl PeerPool { /// Checks if the peer list contains a peer with the given peer id pub async fn contains_peer(&self, pid: &PeerID) -> bool { - self.peers.lock().await.contains_key(pid) + self.peers.read().await.contains_key(pid) + } + + /// Returns the number of currently connected peers. + pub async fn peers_len(&self) -> usize { + self.peers.read().await.len() + } + + /// Returns a map of inbound peers with their endpoints. + pub async fn inbound_peers(&self) -> HashMap { + let mut peers = HashMap::new(); + for (id, peer) in self.peers.read().await.iter() { + if peer.is_inbound() { + peers.insert(id.clone(), peer.remote_endpoint().clone()); + } + } + peers + } + + /// Returns a map of outbound peers with their endpoints. + pub async fn outbound_peers(&self) -> HashMap { + let mut peers = HashMap::new(); + for (id, peer) in self.peers.read().await.iter() { + if !peer.is_inbound() { + peers.insert(id.clone(), peer.remote_endpoint().clone()); + } + } + peers + } + + /// Listens to a new connection from the connection queue + async fn listen_loop(self: Arc) { + loop { + let conn = self.conn_queue.next().await; + let signal = conn.disconnect_signal; + + let result = self + .new_peer(conn.conn, &conn.direction, signal.clone()) + .await; + + // Only send a disconnect signal if there is an error when adding a peer. + if result.is_err() { + let _ = signal.send(result).await; + } + } } /// Shuts down the peer and remove it from the peer list. async fn remove_peer(&self, pid: &PeerID) -> Result<()> { - let result = self.peers.lock().await.remove(pid); + let result = self.peers.write().await.remove(pid); let peer = match result { Some(p) => p, diff --git a/p2p/src/slots.rs b/p2p/src/slots.rs index 0ee0b93..8e3b98e 100644 --- a/p2p/src/slots.rs +++ b/p2p/src/slots.rs @@ -22,11 +22,6 @@ impl ConnectionSlots { } } - /// Returns the number of occupied slots - pub fn load(&self) -> usize { - self.slots.load(Ordering::SeqCst) - } - /// Increases the occupied slots by one. pub fn add(&self) { self.slots.fetch_add(1, Ordering::SeqCst); -- cgit v1.2.3