From 2b032229e46293af92db798a36793c6b8b97baee Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 29 Nov 2023 00:15:10 +0300 Subject: p2p/protocol: improve the Protocol API --- p2p/README.md | 2 +- p2p/examples/chat.rs | 12 ++++++++---- p2p/src/peer/mod.rs | 9 ++++----- p2p/src/peer_pool.rs | 8 ++++---- p2p/src/protocol.rs | 6 +++--- p2p/src/protocols/ping.rs | 14 +++++++------- 6 files changed, 27 insertions(+), 24 deletions(-) (limited to 'p2p') diff --git a/p2p/README.md b/p2p/README.md index 8a8bc19..098cc26 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -90,7 +90,7 @@ impl NewProtocol { #[async_trait] impl Protocol for NewProtocol { - async fn start(self: Arc, ex: Arc>) -> Result<(), P2pError> { + async fn start(self: Arc) -> Result<(), P2pError> { let listener = self.peer.register_listener::().await; loop { let event = listener.recv().await.unwrap(); diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index d94bca4..925c832 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -44,23 +44,25 @@ struct Cli { pub struct ChatProtocol { username: String, peer: ArcPeer, + executor: Arc>, } impl ChatProtocol { - fn new(username: &str, peer: ArcPeer) -> ArcProtocol { + fn new(username: &str, peer: ArcPeer, executor: Arc>) -> ArcProtocol { Arc::new(Self { peer, username: username.to_string(), + executor, }) } } #[async_trait] impl Protocol for ChatProtocol { - async fn start(self: Arc, ex: Arc>) -> Result<(), P2pError> { + async fn start(self: Arc) -> Result<(), P2pError> { let selfc = self.clone(); let stdin = io::stdin(); - let task = ex.spawn(async move { + let task = self.executor.spawn(async move { loop { let mut input = String::new(); stdin.read_line(&mut input).await.unwrap(); @@ -111,6 +113,7 @@ fn main() { peer_endpoints: cli.peer_endpoints, bootstrap_peers: cli.bootstrap_peers, discovery_port: cli.discovery_port.unwrap_or(0), + enable_tls: true, ..Default::default() }; @@ -124,12 +127,13 @@ fn main() { let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); + let ex_cloned = ex.clone(); run_executor( async { let username = cli.username; // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer); + let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone()); backend.attach_protocol::(c).await.unwrap(); // Run the backend diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 6ed0dd8..37c0e2a 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -83,8 +83,8 @@ impl Peer { } /// Run the peer - pub async fn run(self: Arc, ex: GlobalExecutor) -> Result<()> { - self.start_protocols(ex).await; + pub async fn run(self: Arc) -> Result<()> { + self.start_protocols().await; self.read_loop().await } @@ -203,7 +203,7 @@ impl Peer { } /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc, ex: GlobalExecutor) { + async fn start_protocols(self: &Arc) { for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { trace!("peer {} start protocol {protocol_id}", self.id); let protocol = constructor(self.clone()); @@ -223,8 +223,7 @@ impl Peer { } }; - self.task_group - .spawn(protocol.start(ex.clone()), on_failure); + self.task_group.spawn(protocol.start(), on_failure); } } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index dd7e669..ee9ebf9 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -189,8 +189,7 @@ impl PeerPool { } }; - self.task_group - .spawn(peer.run(self.executor.clone()), on_disconnect); + self.task_group.spawn(peer.run(), on_disconnect); info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); @@ -230,8 +229,9 @@ impl PeerPool { /// Attach the core protocols. async fn setup_protocols(&self) -> Result<()> { - self.attach_protocol::(Box::new(PingProtocol::new)) - .await + let executor = self.executor.clone(); + let c = move |peer| PingProtocol::new(peer, executor.clone()); + self.attach_protocol::(Box::new(c)).await } /// Initiate a handshake with a connection. diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 8ddc685..582502e 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; -use karyons_core::{event::EventValue, Executor}; +use karyons_core::event::EventValue; use crate::{peer::ArcPeer, version::Version, Result}; @@ -56,7 +56,7 @@ impl EventValue for ProtocolEvent { /// /// #[async_trait] /// impl Protocol for NewProtocol { -/// async fn start(self: Arc, ex: Arc>) -> Result<(), P2pError> { +/// async fn start(self: Arc) -> Result<(), P2pError> { /// let listener = self.peer.register_listener::().await; /// loop { /// let event = listener.recv().await.unwrap(); @@ -103,7 +103,7 @@ impl EventValue for ProtocolEvent { #[async_trait] pub trait Protocol: Send + Sync { /// Start the protocol - async fn start(self: Arc, ex: Executor<'_>) -> Result<()>; + async fn start(self: Arc) -> Result<()>; /// Returns the version of the protocol. fn version() -> Result diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index 0a5488d..ec7afe2 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -15,7 +15,7 @@ use karyons_core::{ async_util::{select, timeout, Either, TaskGroup, TaskResult}, event::EventListener, util::decode, - Executor, + GlobalExecutor, }; use karyons_net::NetError; @@ -39,17 +39,19 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, + task_group: TaskGroup<'static>, } impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { peer, ping_interval, ping_timeout, + task_group: TaskGroup::new(executor), }) } @@ -126,16 +128,14 @@ impl PingProtocol { #[async_trait] impl Protocol for PingProtocol { - async fn start(self: Arc, ex: Executor<'_>) -> Result<()> { + async fn start(self: Arc) -> Result<()> { trace!("Start Ping protocol"); - let task_group = TaskGroup::new(ex); - let (pong_chan, pong_chan_recv) = channel::bounded(1); let (stop_signal_s, stop_signal) = channel::bounded::>(1); let selfc = self.clone(); - task_group.spawn( + self.task_group.spawn( selfc.clone().ping_loop(pong_chan_recv.clone()), |res| async move { if let TaskResult::Completed(result) = res { @@ -148,7 +148,7 @@ impl Protocol for PingProtocol { let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; listener.cancel().await; - task_group.cancel().await; + self.task_group.cancel().await; match result { Either::Left(res) => { -- cgit v1.2.3