diff options
author | hozan23 <hozan23@karyontech.net> | 2024-07-15 13:16:01 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-07-15 13:16:01 +0200 |
commit | e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch) | |
tree | 7976f6993e4f6b3646f5bd6954189346d5ffd330 /p2p/src/protocols/ping.rs | |
parent | 6c65232d741229635151671708556b9af7ef75ac (diff) |
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol
for initializing a connection with a peer.
Move the handshake logic from the PeerPool module to the protocols directory and
build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'p2p/src/protocols/ping.rs')
-rw-r--r-- | p2p/src/protocols/ping.rs | 39 |
1 files changed, 17 insertions, 22 deletions
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index b800b23..f35b203 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -9,7 +9,6 @@ use rand::{rngs::OsRng, RngCore}; use karyon_core::{ async_runtime::Executor, async_util::{select, sleep, timeout, Either, TaskGroup, TaskResult}, - event::EventListener, util::decode, }; @@ -39,9 +38,12 @@ pub struct PingProtocol { impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: Arc<Peer>, executor: Executor) -> Arc<dyn Protocol> { - let ping_interval = peer.config().ping_interval; - let ping_timeout = peer.config().ping_timeout; + pub fn new( + peer: Arc<Peer>, + ping_interval: u64, + ping_timeout: u64, + executor: Executor, + ) -> Arc<dyn Protocol> { Arc::new(Self { peer, ping_interval, @@ -50,13 +52,9 @@ impl PingProtocol { }) } - async fn recv_loop( - &self, - listener: &EventListener<ProtocolID, ProtocolEvent>, - pong_chan: Sender<[u8; 32]>, - ) -> Result<()> { + async fn recv_loop(&self, pong_chan: Sender<[u8; 32]>) -> Result<()> { loop { - let event = listener.recv().await?; + let event = self.peer.recv::<Self>().await?; let msg_payload = match event.clone() { ProtocolEvent::Message(m) => m, ProtocolEvent::Shutdown => { @@ -70,7 +68,7 @@ impl PingProtocol { PingProtocolMsg::Ping(nonce) => { trace!("Received Ping message {:?}", nonce); self.peer - .send(&Self::id(), &PingProtocolMsg::Pong(nonce)) + .send(Self::id(), &PingProtocolMsg::Pong(nonce)) .await?; trace!("Send back Pong message {:?}", nonce); } @@ -82,7 +80,7 @@ impl PingProtocol { Ok(()) } - async fn ping_loop(self: Arc<Self>, chan: Receiver<[u8; 32]>) -> Result<()> { + async fn ping_loop(&self, chan: Receiver<[u8; 32]>) -> Result<()> { let rng = &mut OsRng; let mut retry = 0; @@ -94,12 +92,11 @@ impl PingProtocol { trace!("Send Ping message {:?}", ping_nonce); self.peer - .send(&Self::id(), &PingProtocolMsg::Ping(ping_nonce)) + .send(Self::id(), &PingProtocolMsg::Ping(ping_nonce)) .await?; - let d = Duration::from_secs(self.ping_timeout); - // Wait for Pong message + let d = Duration::from_secs(self.ping_timeout); let pong_msg = match timeout(d, chan.recv()).await { Ok(m) => m?, Err(_) => { @@ -107,13 +104,14 @@ impl PingProtocol { continue; } }; - trace!("Received Pong message {:?}", pong_msg); if pong_msg != ping_nonce { retry += 1; continue; } + + retry = 0; } Err(NetError::Timeout.into()) @@ -125,8 +123,8 @@ impl Protocol for PingProtocol { async fn start(self: Arc<Self>) -> Result<()> { trace!("Start Ping protocol"); + let stop_signal = async_channel::bounded::<Result<()>>(1); let (pong_chan, pong_chan_recv) = async_channel::bounded(1); - let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1); self.task_group.spawn( { @@ -135,15 +133,12 @@ impl Protocol for PingProtocol { }, |res| async move { if let TaskResult::Completed(result) = res { - let _ = stop_signal_s.send(result).await; + let _ = stop_signal.0.send(result).await; } }, ); - let listener = self.peer.register_listener::<Self>().await; - - let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; - listener.cancel().await; + let result = select(self.recv_loop(pong_chan), stop_signal.1.recv()).await; self.task_group.cancel().await; match result { |