aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer_pool.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3 /p2p/src/peer_pool.rs
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
Diffstat (limited to 'p2p/src/peer_pool.rs')
-rw-r--r--p2p/src/peer_pool.rs27
1 files changed, 15 insertions, 12 deletions
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 2433cfc..0d17307 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -13,7 +13,7 @@ use smol::{
use karyons_core::{
async_utils::{TaskGroup, TaskResult},
utils::decode,
- Executor,
+ GlobalExecutor,
};
use karyons_net::Conn;
@@ -51,10 +51,13 @@ pub struct PeerPool {
protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
+
+ /// A global Executor
+ executor: GlobalExecutor,
/// The Configuration for the P2P network.
- pub config: Arc<Config>,
+ pub(crate) config: Arc<Config>,
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
@@ -67,6 +70,7 @@ impl PeerPool {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ executor: GlobalExecutor,
) -> Arc<Self> {
let protocols = RwLock::new(HashMap::new());
let protocol_versions = Arc::new(RwLock::new(HashMap::new()));
@@ -77,23 +81,23 @@ impl PeerPool {
peers: Mutex::new(HashMap::new()),
protocols,
protocol_versions,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(executor.clone()),
+ executor,
monitor,
config,
})
}
/// Start
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
self.setup_protocols().await?;
let selfc = self.clone();
- self.task_group
- .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {});
+ self.task_group.spawn(selfc.listen_loop(), |_| async {});
Ok(())
}
/// Listens to a new connection from the connection queue
- pub async fn listen_loop(self: Arc<Self>, ex: Executor<'_>) {
+ pub async fn listen_loop(self: Arc<Self>) {
loop {
let new_conn = self.conn_queue.next().await;
let disconnect_signal = new_conn.disconnect_signal;
@@ -103,7 +107,6 @@ impl PeerPool {
new_conn.conn,
&new_conn.direction,
disconnect_signal.clone(),
- ex.clone(),
)
.await;
@@ -128,7 +131,7 @@ impl PeerPool {
let protocols = &mut self.protocols.write().await;
protocol_versions.insert(P::id(), P::version()?);
- protocols.insert(P::id(), Box::new(c) as Box<ProtocolConstructor>);
+ protocols.insert(P::id(), c);
Ok(())
}
@@ -153,7 +156,6 @@ impl PeerPool {
conn: Conn,
conn_direction: &ConnDirection,
disconnect_signal: Sender<()>,
- ex: Executor<'_>,
) -> Result<PeerID> {
let endpoint = conn.peer_endpoint()?;
let io_codec = IOCodec::new(conn);
@@ -173,6 +175,7 @@ impl PeerPool {
io_codec,
endpoint.clone(),
conn_direction.clone(),
+ self.executor.clone(),
);
// Insert the new peer
@@ -190,7 +193,7 @@ impl PeerPool {
};
self.task_group
- .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect);
+ .spawn(peer.run(self.executor.clone()), on_disconnect);
info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");