aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-29 00:15:10 +0300
committerhozan23 <hozan23@proton.me>2023-11-29 00:15:10 +0300
commit2b032229e46293af92db798a36793c6b8b97baee (patch)
tree7b1304c952ff34604e9114d9d15c4687775c714b
parent21e76cf87153c038909d95ff40d982b70003e2fa (diff)
p2p/protocol: improve the Protocol API
-rw-r--r--p2p/README.md2
-rw-r--r--p2p/examples/chat.rs12
-rw-r--r--p2p/src/peer/mod.rs9
-rw-r--r--p2p/src/peer_pool.rs8
-rw-r--r--p2p/src/protocol.rs6
-rw-r--r--p2p/src/protocols/ping.rs14
6 files changed, 27 insertions, 24 deletions
diff --git a/p2p/README.md b/p2p/README.md
index 8a8bc19..098cc26 100644
--- a/p2p/README.md
+++ b/p2p/README.md
@@ -90,7 +90,7 @@ impl NewProtocol {
#[async_trait]
impl Protocol for NewProtocol {
- async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<(), P2pError> {
+ async fn start(self: Arc<Self>) -> Result<(), P2pError> {
let listener = self.peer.register_listener::<Self>().await;
loop {
let event = listener.recv().await.unwrap();
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index d94bca4..925c832 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -44,23 +44,25 @@ struct Cli {
pub struct ChatProtocol {
username: String,
peer: ArcPeer,
+ executor: Arc<Executor<'static>>,
}
impl ChatProtocol {
- fn new(username: &str, peer: ArcPeer) -> ArcProtocol {
+ fn new(username: &str, peer: ArcPeer, executor: Arc<Executor<'static>>) -> ArcProtocol {
Arc::new(Self {
peer,
username: username.to_string(),
+ executor,
})
}
}
#[async_trait]
impl Protocol for ChatProtocol {
- async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<(), P2pError> {
+ async fn start(self: Arc<Self>) -> Result<(), P2pError> {
let selfc = self.clone();
let stdin = io::stdin();
- let task = ex.spawn(async move {
+ let task = self.executor.spawn(async move {
loop {
let mut input = String::new();
stdin.read_line(&mut input).await.unwrap();
@@ -111,6 +113,7 @@ fn main() {
peer_endpoints: cli.peer_endpoints,
bootstrap_peers: cli.bootstrap_peers,
discovery_port: cli.discovery_port.unwrap_or(0),
+ enable_tls: true,
..Default::default()
};
@@ -124,12 +127,13 @@ fn main() {
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
+ let ex_cloned = ex.clone();
run_executor(
async {
let username = cli.username;
// Attach the ChatProtocol
- let c = move |peer| ChatProtocol::new(&username, peer);
+ let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone());
backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
// Run the backend
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 6ed0dd8..37c0e2a 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -83,8 +83,8 @@ impl Peer {
}
/// Run the peer
- pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> {
- self.start_protocols(ex).await;
+ pub async fn run(self: Arc<Self>) -> Result<()> {
+ self.start_protocols().await;
self.read_loop().await
}
@@ -203,7 +203,7 @@ impl Peer {
}
/// Start running the protocols for this peer connection.
- async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) {
+ async fn start_protocols(self: &Arc<Self>) {
for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() {
trace!("peer {} start protocol {protocol_id}", self.id);
let protocol = constructor(self.clone());
@@ -223,8 +223,7 @@ impl Peer {
}
};
- self.task_group
- .spawn(protocol.start(ex.clone()), on_failure);
+ self.task_group.spawn(protocol.start(), on_failure);
}
}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index dd7e669..ee9ebf9 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -189,8 +189,7 @@ impl PeerPool {
}
};
- self.task_group
- .spawn(peer.run(self.executor.clone()), on_disconnect);
+ self.task_group.spawn(peer.run(), on_disconnect);
info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");
@@ -230,8 +229,9 @@ impl PeerPool {
/// Attach the core protocols.
async fn setup_protocols(&self) -> Result<()> {
- self.attach_protocol::<PingProtocol>(Box::new(PingProtocol::new))
- .await
+ let executor = self.executor.clone();
+ let c = move |peer| PingProtocol::new(peer, executor.clone());
+ self.attach_protocol::<PingProtocol>(Box::new(c)).await
}
/// Initiate a handshake with a connection.
diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs
index 8ddc685..582502e 100644
--- a/p2p/src/protocol.rs
+++ b/p2p/src/protocol.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use async_trait::async_trait;
-use karyons_core::{event::EventValue, Executor};
+use karyons_core::event::EventValue;
use crate::{peer::ArcPeer, version::Version, Result};
@@ -56,7 +56,7 @@ impl EventValue for ProtocolEvent {
///
/// #[async_trait]
/// impl Protocol for NewProtocol {
-/// async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<(), P2pError> {
+/// async fn start(self: Arc<Self>) -> Result<(), P2pError> {
/// let listener = self.peer.register_listener::<Self>().await;
/// loop {
/// let event = listener.recv().await.unwrap();
@@ -103,7 +103,7 @@ impl EventValue for ProtocolEvent {
#[async_trait]
pub trait Protocol: Send + Sync {
/// Start the protocol
- async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()>;
+ async fn start(self: Arc<Self>) -> Result<()>;
/// Returns the version of the protocol.
fn version() -> Result<Version>
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index 0a5488d..ec7afe2 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -15,7 +15,7 @@ use karyons_core::{
async_util::{select, timeout, Either, TaskGroup, TaskResult},
event::EventListener,
util::decode,
- Executor,
+ GlobalExecutor,
};
use karyons_net::NetError;
@@ -39,17 +39,19 @@ pub struct PingProtocol {
peer: ArcPeer,
ping_interval: u64,
ping_timeout: u64,
+ task_group: TaskGroup<'static>,
}
impl PingProtocol {
#[allow(clippy::new_ret_no_self)]
- pub fn new(peer: ArcPeer) -> ArcProtocol {
+ pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol {
let ping_interval = peer.config().ping_interval;
let ping_timeout = peer.config().ping_timeout;
Arc::new(Self {
peer,
ping_interval,
ping_timeout,
+ task_group: TaskGroup::new(executor),
})
}
@@ -126,16 +128,14 @@ impl PingProtocol {
#[async_trait]
impl Protocol for PingProtocol {
- async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn start(self: Arc<Self>) -> Result<()> {
trace!("Start Ping protocol");
- let task_group = TaskGroup::new(ex);
-
let (pong_chan, pong_chan_recv) = channel::bounded(1);
let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1);
let selfc = self.clone();
- task_group.spawn(
+ self.task_group.spawn(
selfc.clone().ping_loop(pong_chan_recv.clone()),
|res| async move {
if let TaskResult::Completed(result) = res {
@@ -148,7 +148,7 @@ impl Protocol for PingProtocol {
let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await;
listener.cancel().await;
- task_group.cancel().await;
+ self.task_group.cancel().await;
match result {
Either::Left(res) => {