From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/src/peer_pool.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'p2p/src/peer_pool.rs') diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 2433cfc..0d17307 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -13,7 +13,7 @@ use smol::{ use karyons_core::{ async_utils::{TaskGroup, TaskResult}, utils::decode, - Executor, + GlobalExecutor, }; use karyons_net::Conn; @@ -51,10 +51,13 @@ pub struct PeerPool { protocol_versions: Arc>>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global Executor + executor: GlobalExecutor, /// The Configuration for the P2P network. - pub config: Arc, + pub(crate) config: Arc, /// Responsible for network and system monitoring. monitor: Arc, @@ -67,6 +70,7 @@ impl PeerPool { conn_queue: Arc, config: Arc, monitor: Arc, + executor: GlobalExecutor, ) -> Arc { let protocols = RwLock::new(HashMap::new()); let protocol_versions = Arc::new(RwLock::new(HashMap::new())); @@ -77,23 +81,23 @@ impl PeerPool { peers: Mutex::new(HashMap::new()), protocols, protocol_versions, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, monitor, config, }) } /// Start - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc) -> Result<()> { self.setup_protocols().await?; let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {}); + self.task_group.spawn(selfc.listen_loop(), |_| async {}); Ok(()) } /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc, ex: Executor<'_>) { + pub async fn listen_loop(self: Arc) { loop { let new_conn = self.conn_queue.next().await; let disconnect_signal = new_conn.disconnect_signal; @@ -103,7 +107,6 @@ impl PeerPool { new_conn.conn, &new_conn.direction, disconnect_signal.clone(), - ex.clone(), ) .await; @@ -128,7 +131,7 @@ impl PeerPool { let protocols = &mut self.protocols.write().await; protocol_versions.insert(P::id(), P::version()?); - protocols.insert(P::id(), Box::new(c) as Box); + protocols.insert(P::id(), c); Ok(()) } @@ -153,7 +156,6 @@ impl PeerPool { conn: Conn, conn_direction: &ConnDirection, disconnect_signal: Sender<()>, - ex: Executor<'_>, ) -> Result { let endpoint = conn.peer_endpoint()?; let io_codec = IOCodec::new(conn); @@ -173,6 +175,7 @@ impl PeerPool { io_codec, endpoint.clone(), conn_direction.clone(), + self.executor.clone(), ); // Insert the new peer @@ -190,7 +193,7 @@ impl PeerPool { }; self.task_group - .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect); + .spawn(peer.run(self.executor.clone()), on_disconnect); info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); -- cgit v1.2.3