From f3bb85508335eab91fbd76d15e74dcc575195acf Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 19:26:26 +0200 Subject: p2p/backend: add methods to return inbound/outbound connected peers --- p2p/src/peer_pool.rs | 87 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 34 deletions(-) (limited to 'p2p/src/peer_pool.rs') diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 6c895a0..f8dda66 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -9,15 +9,12 @@ use bincode::{Decode, Encode}; use log::{error, info, trace, warn}; use karyon_core::{ - async_runtime::{ - lock::{Mutex, RwLock}, - Executor, - }, + async_runtime::{lock::RwLock, Executor}, async_util::{timeout, TaskGroup, TaskResult}, util::decode, }; -use karyon_net::Conn; +use karyon_net::{Conn, Endpoint}; use crate::{ config::Config, @@ -42,7 +39,7 @@ pub struct PeerPool { conn_queue: Arc, /// Holds the running peers. - peers: Mutex>, + peers: RwLock>, /// Hashmap contains protocol constructors. pub(crate) protocols: RwLock>>, @@ -78,7 +75,7 @@ impl PeerPool { Arc::new(Self { id: id.clone(), conn_queue, - peers: Mutex::new(HashMap::new()), + peers: RwLock::new(HashMap::new()), protocols, protocol_versions, task_group: TaskGroup::with_executor(executor.clone()), @@ -96,26 +93,9 @@ impl PeerPool { Ok(()) } - /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc) { - loop { - let conn = self.conn_queue.next().await; - let signal = conn.disconnect_signal; - - let result = self - .new_peer(conn.conn, &conn.direction, signal.clone()) - .await; - - // Only send a disconnect signal if there is an error when adding a peer. - if result.is_err() { - let _ = signal.send(result).await; - } - } - } - /// Shuts down pub async fn shutdown(&self) { - for (_, peer) in self.peers.lock().await.iter() { + for (_, peer) in self.peers.read().await.iter() { peer.shutdown().await; } @@ -132,14 +112,9 @@ impl PeerPool { Ok(()) } - /// Returns the number of currently connected peers. - pub async fn peers_len(&self) -> usize { - self.peers.lock().await.len() - } - /// Broadcast a message to all connected peers using the specified protocol. pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { - for (pid, peer) in self.peers.lock().await.iter() { + for (pid, peer) in self.peers.read().await.iter() { if let Err(err) = peer.send(proto_id, msg).await { error!("failed to send msg to {pid}: {err}"); continue; @@ -175,7 +150,7 @@ impl PeerPool { ); // Insert the new peer - self.peers.lock().await.insert(pid.clone(), peer.clone()); + self.peers.write().await.insert(pid.clone(), peer.clone()); let selfc = self.clone(); let pid_c = pid.clone(); @@ -199,12 +174,56 @@ impl PeerPool { /// Checks if the peer list contains a peer with the given peer id pub async fn contains_peer(&self, pid: &PeerID) -> bool { - self.peers.lock().await.contains_key(pid) + self.peers.read().await.contains_key(pid) + } + + /// Returns the number of currently connected peers. + pub async fn peers_len(&self) -> usize { + self.peers.read().await.len() + } + + /// Returns a map of inbound peers with their endpoints. + pub async fn inbound_peers(&self) -> HashMap { + let mut peers = HashMap::new(); + for (id, peer) in self.peers.read().await.iter() { + if peer.is_inbound() { + peers.insert(id.clone(), peer.remote_endpoint().clone()); + } + } + peers + } + + /// Returns a map of outbound peers with their endpoints. + pub async fn outbound_peers(&self) -> HashMap { + let mut peers = HashMap::new(); + for (id, peer) in self.peers.read().await.iter() { + if !peer.is_inbound() { + peers.insert(id.clone(), peer.remote_endpoint().clone()); + } + } + peers + } + + /// Listens to a new connection from the connection queue + async fn listen_loop(self: Arc) { + loop { + let conn = self.conn_queue.next().await; + let signal = conn.disconnect_signal; + + let result = self + .new_peer(conn.conn, &conn.direction, signal.clone()) + .await; + + // Only send a disconnect signal if there is an error when adding a peer. + if result.is_err() { + let _ = signal.send(result).await; + } + } } /// Shuts down the peer and remove it from the peer list. async fn remove_peer(&self, pid: &PeerID) -> Result<()> { - let result = self.peers.lock().await.remove(pid); + let result = self.peers.write().await.remove(pid); let peer = match result { Some(p) => p, -- cgit v1.2.3