From df6aad2be4c6c5d11483f20e62d41e71f0ac989e Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@proton.me>
Date: Wed, 13 Mar 2024 12:33:34 +0100
Subject: net: major cleanup and improvement of the crate api

---
 p2p/src/connector.rs         |  8 +++++---
 p2p/src/discovery/refresh.rs | 19 ++++++++-----------
 p2p/src/listener.rs          | 14 ++++++++------
 3 files changed, 21 insertions(+), 20 deletions(-)

(limited to 'p2p/src')

diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index e83d8da..41839ab 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -7,7 +7,7 @@ use karyon_core::{
     crypto::KeyPair,
     GlobalExecutor,
 };
-use karyon_net::{dial, tls, Conn, Endpoint, NetError};
+use karyon_net::{tcp, tls, Conn, Endpoint, NetError};
 
 use crate::{
     monitor::{ConnEvent, Monitor},
@@ -142,9 +142,11 @@ impl Connector {
     async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
         if self.enable_tls {
             let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?;
-            tls::dial(endpoint, tls_config, DNS_NAME).await
+            tls::dial(endpoint, tls_config, DNS_NAME)
+                .await
+                .map(|l| Box::new(l) as Conn)
         } else {
-            dial(endpoint).await
+            tcp::dial(endpoint).await.map(|l| Box::new(l) as Conn)
         }
         .map_err(Error::KaryonNet)
     }
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 882a93e..bfcab56 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -15,7 +15,7 @@ use karyon_core::{
     GlobalExecutor,
 };
 
-use karyon_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
+use karyon_net::{udp, Connection, Endpoint, NetError};
 
 /// Maximum failures for an entry before removing it from the routing table.
 pub const MAX_FAILURES: u32 = 3;
@@ -82,12 +82,10 @@ impl RefreshService {
     pub async fn start(self: &Arc<Self>) -> Result<()> {
         if let Some(endpoint) = &self.listen_endpoint {
             let endpoint = endpoint.read().await;
-            let addr = endpoint.addr()?;
-            let port = self.config.discovery_port;
 
             let selfc = self.clone();
             self.task_group
-                .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {
+                .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
                     if let TaskResult::Completed(Err(err)) = res {
                         error!("Listen loop stopped: {err}");
                     }
@@ -195,8 +193,8 @@ impl RefreshService {
     /// specified in the Config, with backoff between each retry.
     async fn connect(&self, entry: &Entry) -> Result<()> {
         let mut retry = 0;
-        let endpoint = Endpoint::Ws(entry.addr.clone(), entry.discovery_port);
-        let conn = dial_udp(&endpoint).await?;
+        let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port);
+        let conn = udp::dial(&endpoint).await?;
         let backoff = Backoff::new(100, 5000);
         while retry < self.config.refresh_connect_retries {
             match self.send_ping_msg(&conn).await {
@@ -216,9 +214,8 @@ impl RefreshService {
 
     /// Set up a UDP listener and start listening for Ping messages from other
     /// peers.
-    async fn listen_loop(self: Arc<Self>, addr: Addr, port: Port) -> Result<()> {
-        let endpoint = Endpoint::Udp(addr.clone(), port);
-        let conn = match listen_udp(&endpoint).await {
+    async fn listen_loop(self: Arc<Self>, endpoint: Endpoint) -> Result<()> {
+        let conn = match udp::listen(&endpoint).await {
             Ok(c) => {
                 self.monitor
                     .notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -244,7 +241,7 @@ impl RefreshService {
     }
 
     /// Listen to receive a Ping message and respond with a Pong message.
-    async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> {
+    async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
         let mut buf = [0; PINGMSG_SIZE];
         let (_, endpoint) = conn.recv_from(&mut buf).await?;
 
@@ -266,7 +263,7 @@ impl RefreshService {
     }
 
     /// Sends a Ping msg and wait to receive the Pong message.
-    async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> {
+    async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
         let mut nonce: [u8; 32] = [0; 32];
         RngCore::fill_bytes(&mut OsRng, &mut nonce);
 
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 254e4e6..17aa187 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -8,7 +8,7 @@ use karyon_core::{
     GlobalExecutor,
 };
 
-use karyon_net::{listen, tls, Conn, ConnListener, Endpoint};
+use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint};
 
 use crate::{
     monitor::{ConnEvent, Monitor},
@@ -67,7 +67,7 @@ impl Listener {
     where
         Fut: Future<Output = Result<()>> + Send + 'static,
     {
-        let listener = match self.listend(&endpoint).await {
+        let listener = match self.listen(&endpoint).await {
             Ok(listener) => {
                 self.monitor
                     .notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -152,14 +152,16 @@ impl Listener {
         }
     }
 
-    async fn listend(&self, endpoint: &Endpoint) -> Result<Box<dyn ConnListener>> {
+    async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener> {
         if self.enable_tls {
             let tls_config = tls_server_config(&self.key_pair)?;
-            tls::listen_tls(endpoint, tls_config)
+            tls::listen(endpoint, tls_config)
                 .await
-                .map(|l| Box::new(l) as Box<dyn ConnListener>)
+                .map(|l| Box::new(l) as karyon_net::Listener)
         } else {
-            listen(endpoint).await
+            tcp::listen(endpoint)
+                .await
+                .map(|l| Box::new(l) as karyon_net::Listener)
         }
         .map_err(Error::KaryonNet)
     }
-- 
cgit v1.2.3