aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/src/backend.rs15
-rw-r--r--p2p/src/discovery/mod.rs35
-rw-r--r--p2p/src/peer_pool.rs87
-rw-r--r--p2p/src/slots.rs5
4 files changed, 77 insertions, 65 deletions
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 16cc20b..f21d70b 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -1,8 +1,9 @@
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
use log::info;
use karyon_core::{async_runtime::Executor, crypto::KeyPair};
+use karyon_net::Endpoint;
use crate::{
config::Config,
@@ -111,14 +112,14 @@ impl Backend {
&self.key_pair
}
- /// Returns the number of occupied inbound slots.
- pub fn inbound_slots(&self) -> usize {
- self.discovery.inbound_slots.load()
+ /// Returns a map of inbound connected peers with their endpoints.
+ pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ self.peer_pool.inbound_peers().await
}
- /// Returns the number of occupied outbound slots.
- pub fn outbound_slots(&self) -> usize {
- self.discovery.outbound_slots.load()
+ /// Returns a map of outbound connected peers with their endpoints.
+ pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ self.peer_pool.outbound_peers().await
}
/// Returns the monitor to receive system events.
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 529469e..99f880d 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -53,12 +53,6 @@ pub struct Discovery {
/// Connection queue
conn_queue: Arc<ConnQueue>,
- /// Inbound slots.
- pub(crate) inbound_slots: Arc<ConnectionSlots>,
-
- /// Outbound slots.
- pub(crate) outbound_slots: Arc<ConnectionSlots>,
-
/// Managing spawned tasks.
task_group: TaskGroup,
@@ -76,23 +70,25 @@ impl Discovery {
monitor: Arc<Monitor>,
ex: Executor,
) -> ArcDiscovery {
- let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
- let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
+ let table = Arc::new(RoutingTable::new(peer_id.0));
- let table_key = peer_id.0;
- let table = Arc::new(RoutingTable::new(table_key));
+ let refresh_service = Arc::new(RefreshService::new(
+ config.clone(),
+ table.clone(),
+ monitor.clone(),
+ ex.clone(),
+ ));
- let refresh_service =
- RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
- let lookup_service = LookupService::new(
+ let lookup_service = Arc::new(LookupService::new(
key_pair,
peer_id,
table.clone(),
config.clone(),
monitor.clone(),
ex.clone(),
- );
+ ));
+ let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
let connector = Connector::new(
key_pair,
config.max_connect_retries,
@@ -102,6 +98,7 @@ impl Discovery {
ex.clone(),
);
+ let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let listener = Listener::new(
key_pair,
inbound_slots.clone(),
@@ -110,16 +107,16 @@ impl Discovery {
ex.clone(),
);
+ let task_group = TaskGroup::with_executor(ex);
+
Arc::new(Self {
- refresh_service: Arc::new(refresh_service),
- lookup_service: Arc::new(lookup_service),
+ refresh_service,
+ lookup_service,
conn_queue,
table,
- inbound_slots,
- outbound_slots,
connector,
listener,
- task_group: TaskGroup::with_executor(ex),
+ task_group,
config,
})
}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 6c895a0..f8dda66 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -9,15 +9,12 @@ use bincode::{Decode, Encode};
use log::{error, info, trace, warn};
use karyon_core::{
- async_runtime::{
- lock::{Mutex, RwLock},
- Executor,
- },
+ async_runtime::{lock::RwLock, Executor},
async_util::{timeout, TaskGroup, TaskResult},
util::decode,
};
-use karyon_net::Conn;
+use karyon_net::{Conn, Endpoint};
use crate::{
config::Config,
@@ -42,7 +39,7 @@ pub struct PeerPool {
conn_queue: Arc<ConnQueue>,
/// Holds the running peers.
- peers: Mutex<HashMap<PeerID, ArcPeer>>,
+ peers: RwLock<HashMap<PeerID, ArcPeer>>,
/// Hashmap contains protocol constructors.
pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>,
@@ -78,7 +75,7 @@ impl PeerPool {
Arc::new(Self {
id: id.clone(),
conn_queue,
- peers: Mutex::new(HashMap::new()),
+ peers: RwLock::new(HashMap::new()),
protocols,
protocol_versions,
task_group: TaskGroup::with_executor(executor.clone()),
@@ -96,26 +93,9 @@ impl PeerPool {
Ok(())
}
- /// Listens to a new connection from the connection queue
- pub async fn listen_loop(self: Arc<Self>) {
- loop {
- let conn = self.conn_queue.next().await;
- let signal = conn.disconnect_signal;
-
- let result = self
- .new_peer(conn.conn, &conn.direction, signal.clone())
- .await;
-
- // Only send a disconnect signal if there is an error when adding a peer.
- if result.is_err() {
- let _ = signal.send(result).await;
- }
- }
- }
-
/// Shuts down
pub async fn shutdown(&self) {
- for (_, peer) in self.peers.lock().await.iter() {
+ for (_, peer) in self.peers.read().await.iter() {
peer.shutdown().await;
}
@@ -132,14 +112,9 @@ impl PeerPool {
Ok(())
}
- /// Returns the number of currently connected peers.
- pub async fn peers_len(&self) -> usize {
- self.peers.lock().await.len()
- }
-
/// Broadcast a message to all connected peers using the specified protocol.
pub async fn broadcast<T: Decode + Encode>(&self, proto_id: &ProtocolID, msg: &T) {
- for (pid, peer) in self.peers.lock().await.iter() {
+ for (pid, peer) in self.peers.read().await.iter() {
if let Err(err) = peer.send(proto_id, msg).await {
error!("failed to send msg to {pid}: {err}");
continue;
@@ -175,7 +150,7 @@ impl PeerPool {
);
// Insert the new peer
- self.peers.lock().await.insert(pid.clone(), peer.clone());
+ self.peers.write().await.insert(pid.clone(), peer.clone());
let selfc = self.clone();
let pid_c = pid.clone();
@@ -199,12 +174,56 @@ impl PeerPool {
/// Checks if the peer list contains a peer with the given peer id
pub async fn contains_peer(&self, pid: &PeerID) -> bool {
- self.peers.lock().await.contains_key(pid)
+ self.peers.read().await.contains_key(pid)
+ }
+
+ /// Returns the number of currently connected peers.
+ pub async fn peers_len(&self) -> usize {
+ self.peers.read().await.len()
+ }
+
+ /// Returns a map of inbound peers with their endpoints.
+ pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ let mut peers = HashMap::new();
+ for (id, peer) in self.peers.read().await.iter() {
+ if peer.is_inbound() {
+ peers.insert(id.clone(), peer.remote_endpoint().clone());
+ }
+ }
+ peers
+ }
+
+ /// Returns a map of outbound peers with their endpoints.
+ pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
+ let mut peers = HashMap::new();
+ for (id, peer) in self.peers.read().await.iter() {
+ if !peer.is_inbound() {
+ peers.insert(id.clone(), peer.remote_endpoint().clone());
+ }
+ }
+ peers
+ }
+
+ /// Listens to a new connection from the connection queue
+ async fn listen_loop(self: Arc<Self>) {
+ loop {
+ let conn = self.conn_queue.next().await;
+ let signal = conn.disconnect_signal;
+
+ let result = self
+ .new_peer(conn.conn, &conn.direction, signal.clone())
+ .await;
+
+ // Only send a disconnect signal if there is an error when adding a peer.
+ if result.is_err() {
+ let _ = signal.send(result).await;
+ }
+ }
}
/// Shuts down the peer and remove it from the peer list.
async fn remove_peer(&self, pid: &PeerID) -> Result<()> {
- let result = self.peers.lock().await.remove(pid);
+ let result = self.peers.write().await.remove(pid);
let peer = match result {
Some(p) => p,
diff --git a/p2p/src/slots.rs b/p2p/src/slots.rs
index 0ee0b93..8e3b98e 100644
--- a/p2p/src/slots.rs
+++ b/p2p/src/slots.rs
@@ -22,11 +22,6 @@ impl ConnectionSlots {
}
}
- /// Returns the number of occupied slots
- pub fn load(&self) -> usize {
- self.slots.load(Ordering::SeqCst)
- }
-
/// Increases the occupied slots by one.
pub fn add(&self) {
self.slots.fetch_add(1, Ordering::SeqCst);