From df6aad2be4c6c5d11483f20e62d41e71f0ac989e Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 13 Mar 2024 12:33:34 +0100 Subject: net: major cleanup and improvement of the crate api --- p2p/src/discovery/refresh.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) (limited to 'p2p/src/discovery/refresh.rs') diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 882a93e..bfcab56 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -15,7 +15,7 @@ use karyon_core::{ GlobalExecutor, }; -use karyon_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; +use karyon_net::{udp, Connection, Endpoint, NetError}; /// Maximum failures for an entry before removing it from the routing table. pub const MAX_FAILURES: u32 = 3; @@ -82,12 +82,10 @@ impl RefreshService { pub async fn start(self: &Arc) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { let endpoint = endpoint.read().await; - let addr = endpoint.addr()?; - let port = self.config.discovery_port; let selfc = self.clone(); self.task_group - .spawn(selfc.listen_loop(addr.clone(), port), |res| async move { + .spawn(selfc.listen_loop(endpoint.clone()), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } @@ -195,8 +193,8 @@ impl RefreshService { /// specified in the Config, with backoff between each retry. async fn connect(&self, entry: &Entry) -> Result<()> { let mut retry = 0; - let endpoint = Endpoint::Ws(entry.addr.clone(), entry.discovery_port); - let conn = dial_udp(&endpoint).await?; + let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port); + let conn = udp::dial(&endpoint).await?; let backoff = Backoff::new(100, 5000); while retry < self.config.refresh_connect_retries { match self.send_ping_msg(&conn).await { @@ -216,9 +214,8 @@ impl RefreshService { /// Set up a UDP listener and start listening for Ping messages from other /// peers. - async fn listen_loop(self: Arc, addr: Addr, port: Port) -> Result<()> { - let endpoint = Endpoint::Udp(addr.clone(), port); - let conn = match listen_udp(&endpoint).await { + async fn listen_loop(self: Arc, endpoint: Endpoint) -> Result<()> { + let conn = match udp::listen(&endpoint).await { Ok(c) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -244,7 +241,7 @@ impl RefreshService { } /// Listen to receive a Ping message and respond with a Pong message. - async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> { + async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { let mut buf = [0; PINGMSG_SIZE]; let (_, endpoint) = conn.recv_from(&mut buf).await?; @@ -266,7 +263,7 @@ impl RefreshService { } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> { + async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { let mut nonce: [u8; 32] = [0; 32]; RngCore::fill_bytes(&mut OsRng, &mut nonce); -- cgit v1.2.3