From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- p2p/src/discovery/refresh.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index b4f5396..1452a1b 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -2,10 +2,11 @@ use std::{sync::Arc, time::Duration}; use bincode::{Decode, Encode}; use log::{error, info, trace}; +use parking_lot::RwLock; use rand::{rngs::OsRng, RngCore}; use karyon_core::{ - async_runtime::{lock::RwLock, Executor}, + async_runtime::Executor, async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; @@ -33,7 +34,7 @@ pub struct RefreshService { table: Arc, /// Resolved listen endpoint - listen_endpoint: Option>, + listen_endpoint: RwLock>, /// Managing spawned tasks. task_group: TaskGroup, @@ -53,14 +54,9 @@ impl RefreshService { monitor: Arc, executor: Executor, ) -> Self { - let listen_endpoint = config - .listen_endpoint - .as_ref() - .map(|endpoint| RwLock::new(endpoint.clone())); - Self { table, - listen_endpoint, + listen_endpoint: RwLock::new(None), task_group: TaskGroup::with_executor(executor.clone()), config, monitor, @@ -69,9 +65,8 @@ impl RefreshService { /// Start the refresh service pub async fn start(self: &Arc) -> Result<()> { - if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await.clone(); - + if let Some(endpoint) = self.listen_endpoint.read().as_ref() { + let endpoint = endpoint.clone(); self.task_group.spawn( { let this = self.clone(); @@ -101,10 +96,13 @@ impl RefreshService { } /// Set the resolved listen endpoint. - pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { - if let Some(endpoint) = &self.listen_endpoint { - *endpoint.write().await = resolved_endpoint.clone(); - } + pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> { + let resolved_endpoint = Endpoint::Udp( + resolved_endpoint.addr()?.clone(), + self.config.discovery_port, + ); + *self.listen_endpoint.write() = Some(resolved_endpoint); + Ok(()) } /// Shuts down the refresh service -- cgit v1.2.3