diff options
Diffstat (limited to 'p2p/src/peer_pool.rs')
-rw-r--r-- | p2p/src/peer_pool.rs | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 2433cfc..0d17307 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -13,7 +13,7 @@ use smol::{ use karyons_core::{ async_utils::{TaskGroup, TaskResult}, utils::decode, - Executor, + GlobalExecutor, }; use karyons_net::Conn; @@ -51,10 +51,13 @@ pub struct PeerPool { protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global Executor + executor: GlobalExecutor, /// The Configuration for the P2P network. - pub config: Arc<Config>, + pub(crate) config: Arc<Config>, /// Responsible for network and system monitoring. monitor: Arc<Monitor>, @@ -67,6 +70,7 @@ impl PeerPool { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, + executor: GlobalExecutor, ) -> Arc<Self> { let protocols = RwLock::new(HashMap::new()); let protocol_versions = Arc::new(RwLock::new(HashMap::new())); @@ -77,23 +81,23 @@ impl PeerPool { peers: Mutex::new(HashMap::new()), protocols, protocol_versions, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, monitor, config, }) } /// Start - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc<Self>) -> Result<()> { self.setup_protocols().await?; let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {}); + self.task_group.spawn(selfc.listen_loop(), |_| async {}); Ok(()) } /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc<Self>, ex: Executor<'_>) { + pub async fn listen_loop(self: Arc<Self>) { loop { let new_conn = self.conn_queue.next().await; let disconnect_signal = new_conn.disconnect_signal; @@ -103,7 +107,6 @@ impl PeerPool { new_conn.conn, &new_conn.direction, disconnect_signal.clone(), - ex.clone(), ) .await; @@ -128,7 +131,7 @@ impl PeerPool { let protocols = &mut self.protocols.write().await; protocol_versions.insert(P::id(), P::version()?); - protocols.insert(P::id(), Box::new(c) as Box<ProtocolConstructor>); + protocols.insert(P::id(), c); Ok(()) } @@ -153,7 +156,6 @@ impl PeerPool { conn: Conn, conn_direction: &ConnDirection, disconnect_signal: Sender<()>, - ex: Executor<'_>, ) -> Result<PeerID> { let endpoint = conn.peer_endpoint()?; let io_codec = IOCodec::new(conn); @@ -173,6 +175,7 @@ impl PeerPool { io_codec, endpoint.clone(), conn_direction.clone(), + self.executor.clone(), ); // Insert the new peer @@ -190,7 +193,7 @@ impl PeerPool { }; self.task_group - .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect); + .spawn(peer.run(self.executor.clone()), on_disconnect); info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); |