aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer_pool.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-24 19:26:26 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-24 19:26:26 +0200
commitf3bb85508335eab91fbd76d15e74dcc575195acf (patch)
treecddf1241f12763327a5d9e8365424d8fac52e82c /p2p/src/peer_pool.rs
parentcc6b474b0d35f5fa3f00a742b1c0e18a9a1a25a3 (diff)
p2p/backend: add methods to return inbound/outbound connected peers
Diffstat (limited to 'p2p/src/peer_pool.rs')
-rw-r--r--p2p/src/peer_pool.rs87
1 files changed, 53 insertions, 34 deletions
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 6c895a0..f8dda66 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -9,15 +9,12 @@ use bincode::{Decode, Encode};
use log::{error, info, trace, warn};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::{timeout, TaskGroup, TaskResult},
util::decode,
};
-use karyon_net::Conn;
+use karyon_net::{Conn, Endpoint};
use crate::{
config::Config,
@@ -42,7 +39,7 @@ pub struct PeerPool {
conn_queue: Arc<ConnQueue>,
/// Holds the running peers.
- peers: Mutex<HashMap<PeerID, ArcPeer>>,
+ peers: RwLock<HashMap<PeerID, ArcPeer>>,
/// Hashmap contains protocol constructors.
pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>,
@@ -78,7 +75,7 @@ impl PeerPool {
Arc::new(Self {
id: id.clone(),
conn_queue,
- peers: Mutex::new(HashMap::new()),
+ peers: RwLock::new(HashMap::new()),
protocols,
protocol_versions,
task_group: TaskGroup::with_executor(executor.clone()),
@@ -96,26 +93,9 @@ impl PeerPool {
Ok(())
}
- /// Listens to a new connection from the connection queue
- pub async fn listen_loop(self: Arc<Self>) {
- loop {
- let conn = self.conn_queue.next().await;
- let signal = conn.disconnect_signal;
-
- let result = self
- .new_peer(conn.conn, &conn.direction, signal.clone())
- .await;
-
- // Only send a disconnect signal if there is an error when adding a peer.
- if result.is_err() {
- let _ = signal.send(result).await;
- }
- }
- }
-
/// Shuts down
pub async fn shutdown(&self) {
- for (_, peer) in self.peers.lock().await.iter() {
+ for (_, peer) in self.peers.read().await.iter() {
peer.shutdown().await;
}
@@ -132,14 +112,9 @@ impl PeerPool {
Ok(())
}
- /// Returns the number of currently connected peers.
- pub async fn peers_len(&self) -> usize {
- self.peers.lock().await.len()
- }
-
/// Broadcast a message to all connected peers using the specified protocol.
pub async fn broadcast<T: Decode + Encode>(&self, proto_id: &ProtocolID, msg: &T) {
- for (pid, peer) in self.peers.lock().await.iter() {
+ for (pid, peer) in self.peers.read().await.iter() {
if let Err(err) = peer.send(proto_id, msg).await {
error!("failed to send msg to {pid}: {err}");
continue;
@@ -175,7 +150,7 @@ impl PeerPool {
);
// Insert the new peer
- self.peers.lock().await.insert(pid.clone(), peer.clone());
+ self.peers.write().await.insert(pid.clone(), peer.clone());
let selfc = self.clone();
let pid_c = pid.clone();
@@ -199,12 +174,56 @@ impl PeerPool {
/// Checks if the peer list contains a peer with the given peer id
pub async fn contains_peer(&self, pid: &PeerID) -> bool {
- self.peers.lock().await.contains_key(pid)
+ self.peers.read().await.contains_key(pid)
+ }
+
+ /// Returns the number of currently connected peers.
+ pub async fn peers_len(&self) -> usize {
+ self.peers.read().await.len()
+ }
+
+ /// Returns a map of inbound peers with their endpoints.
+ pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ let mut peers = HashMap::new();
+ for (id, peer) in self.peers.read().await.iter() {
+ if peer.is_inbound() {
+ peers.insert(id.clone(), peer.remote_endpoint().clone());
+ }
+ }
+ peers
+ }
+
+ /// Returns a map of outbound peers with their endpoints.
+ pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ let mut peers = HashMap::new();
+ for (id, peer) in self.peers.read().await.iter() {
+ if !peer.is_inbound() {
+ peers.insert(id.clone(), peer.remote_endpoint().clone());
+ }
+ }
+ peers
+ }
+
+ /// Listens to a new connection from the connection queue
+ async fn listen_loop(self: Arc<Self>) {
+ loop {
+ let conn = self.conn_queue.next().await;
+ let signal = conn.disconnect_signal;
+
+ let result = self
+ .new_peer(conn.conn, &conn.direction, signal.clone())
+ .await;
+
+ // Only send a disconnect signal if there is an error when adding a peer.
+ if result.is_err() {
+ let _ = signal.send(result).await;
+ }
+ }
}
/// Shuts down the peer and remove it from the peer list.
async fn remove_peer(&self, pid: &PeerID) -> Result<()> {
- let result = self.peers.lock().await.remove(pid);
+ let result = self.peers.write().await.remove(pid);
let peer = match result {
Some(p) => p,