diff options
Diffstat (limited to 'p2p/src')
-rw-r--r-- | p2p/src/peer/mod.rs | 9 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 8 | ||||
-rw-r--r-- | p2p/src/protocol.rs | 6 | ||||
-rw-r--r-- | p2p/src/protocols/ping.rs | 14 |
4 files changed, 18 insertions, 19 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 6ed0dd8..37c0e2a 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -83,8 +83,8 @@ impl Peer { } /// Run the peer - pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> { - self.start_protocols(ex).await; + pub async fn run(self: Arc<Self>) -> Result<()> { + self.start_protocols().await; self.read_loop().await } @@ -203,7 +203,7 @@ impl Peer { } /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) { + async fn start_protocols(self: &Arc<Self>) { for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { trace!("peer {} start protocol {protocol_id}", self.id); let protocol = constructor(self.clone()); @@ -223,8 +223,7 @@ impl Peer { } }; - self.task_group - .spawn(protocol.start(ex.clone()), on_failure); + self.task_group.spawn(protocol.start(), on_failure); } } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index dd7e669..ee9ebf9 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -189,8 +189,7 @@ impl PeerPool { } }; - self.task_group - .spawn(peer.run(self.executor.clone()), on_disconnect); + self.task_group.spawn(peer.run(), on_disconnect); info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); @@ -230,8 +229,9 @@ impl PeerPool { /// Attach the core protocols. async fn setup_protocols(&self) -> Result<()> { - self.attach_protocol::<PingProtocol>(Box::new(PingProtocol::new)) - .await + let executor = self.executor.clone(); + let c = move |peer| PingProtocol::new(peer, executor.clone()); + self.attach_protocol::<PingProtocol>(Box::new(c)).await } /// Initiate a handshake with a connection. diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 8ddc685..582502e 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; -use karyons_core::{event::EventValue, Executor}; +use karyons_core::event::EventValue; use crate::{peer::ArcPeer, version::Version, Result}; @@ -56,7 +56,7 @@ impl EventValue for ProtocolEvent { /// /// #[async_trait] /// impl Protocol for NewProtocol { -/// async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<(), P2pError> { +/// async fn start(self: Arc<Self>) -> Result<(), P2pError> { /// let listener = self.peer.register_listener::<Self>().await; /// loop { /// let event = listener.recv().await.unwrap(); @@ -103,7 +103,7 @@ impl EventValue for ProtocolEvent { #[async_trait] pub trait Protocol: Send + Sync { /// Start the protocol - async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()>; + async fn start(self: Arc<Self>) -> Result<()>; /// Returns the version of the protocol. fn version() -> Result<Version> diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index 0a5488d..ec7afe2 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -15,7 +15,7 @@ use karyons_core::{ async_util::{select, timeout, Either, TaskGroup, TaskResult}, event::EventListener, util::decode, - Executor, + GlobalExecutor, }; use karyons_net::NetError; @@ -39,17 +39,19 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, + task_group: TaskGroup<'static>, } impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { peer, ping_interval, ping_timeout, + task_group: TaskGroup::new(executor), }) } @@ -126,16 +128,14 @@ impl PingProtocol { #[async_trait] impl Protocol for PingProtocol { - async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn start(self: Arc<Self>) -> Result<()> { trace!("Start Ping protocol"); - let task_group = TaskGroup::new(ex); - let (pong_chan, pong_chan_recv) = channel::bounded(1); let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1); let selfc = self.clone(); - task_group.spawn( + self.task_group.spawn( selfc.clone().ping_loop(pong_chan_recv.clone()), |res| async move { if let TaskResult::Completed(result) = res { @@ -148,7 +148,7 @@ impl Protocol for PingProtocol { let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; listener.cancel().await; - task_group.cancel().await; + self.task_group.cancel().await; match result { Either::Left(res) => { |