aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs19
1 files changed, 8 insertions, 11 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 882a93e..bfcab56 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -15,7 +15,7 @@ use karyon_core::{
GlobalExecutor,
};
-use karyon_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
+use karyon_net::{udp, Connection, Endpoint, NetError};
/// Maximum failures for an entry before removing it from the routing table.
pub const MAX_FAILURES: u32 = 3;
@@ -82,12 +82,10 @@ impl RefreshService {
pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
let endpoint = endpoint.read().await;
- let addr = endpoint.addr()?;
- let port = self.config.discovery_port;
let selfc = self.clone();
self.task_group
- .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {
+ .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
@@ -195,8 +193,8 @@ impl RefreshService {
/// specified in the Config, with backoff between each retry.
async fn connect(&self, entry: &Entry) -> Result<()> {
let mut retry = 0;
- let endpoint = Endpoint::Ws(entry.addr.clone(), entry.discovery_port);
- let conn = dial_udp(&endpoint).await?;
+ let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port);
+ let conn = udp::dial(&endpoint).await?;
let backoff = Backoff::new(100, 5000);
while retry < self.config.refresh_connect_retries {
match self.send_ping_msg(&conn).await {
@@ -216,9 +214,8 @@ impl RefreshService {
/// Set up a UDP listener and start listening for Ping messages from other
/// peers.
- async fn listen_loop(self: Arc<Self>, addr: Addr, port: Port) -> Result<()> {
- let endpoint = Endpoint::Udp(addr.clone(), port);
- let conn = match listen_udp(&endpoint).await {
+ async fn listen_loop(self: Arc<Self>, endpoint: Endpoint) -> Result<()> {
+ let conn = match udp::listen(&endpoint).await {
Ok(c) => {
self.monitor
.notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -244,7 +241,7 @@ impl RefreshService {
}
/// Listen to receive a Ping message and respond with a Pong message.
- async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> {
+ async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
let mut buf = [0; PINGMSG_SIZE];
let (_, endpoint) = conn.recv_from(&mut buf).await?;
@@ -266,7 +263,7 @@ impl RefreshService {
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> {
+ async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
let mut nonce: [u8; 32] = [0; 32];
RngCore::fill_bytes(&mut OsRng, &mut nonce);