aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery/refresh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/discovery/refresh.rs')
-rw-r--r--p2p/src/discovery/refresh.rs28
1 files changed, 13 insertions, 15 deletions
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index b4f5396..1452a1b 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -2,10 +2,11 @@ use std::{sync::Arc, time::Duration};
use bincode::{Decode, Encode};
use log::{error, info, trace};
+use parking_lot::RwLock;
use rand::{rngs::OsRng, RngCore};
use karyon_core::{
- async_runtime::{lock::RwLock, Executor},
+ async_runtime::Executor,
async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
@@ -33,7 +34,7 @@ pub struct RefreshService {
table: Arc<RoutingTable>,
/// Resolved listen endpoint
- listen_endpoint: Option<RwLock<Endpoint>>,
+ listen_endpoint: RwLock<Option<Endpoint>>,
/// Managing spawned tasks.
task_group: TaskGroup,
@@ -53,14 +54,9 @@ impl RefreshService {
monitor: Arc<Monitor>,
executor: Executor,
) -> Self {
- let listen_endpoint = config
- .listen_endpoint
- .as_ref()
- .map(|endpoint| RwLock::new(endpoint.clone()));
-
Self {
table,
- listen_endpoint,
+ listen_endpoint: RwLock::new(None),
task_group: TaskGroup::with_executor(executor.clone()),
config,
monitor,
@@ -69,9 +65,8 @@ impl RefreshService {
/// Start the refresh service
pub async fn start(self: &Arc<Self>) -> Result<()> {
- if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await.clone();
-
+ if let Some(endpoint) = self.listen_endpoint.read().as_ref() {
+ let endpoint = endpoint.clone();
self.task_group.spawn(
{
let this = self.clone();
@@ -101,10 +96,13 @@ impl RefreshService {
}
/// Set the resolved listen endpoint.
- pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) {
- if let Some(endpoint) = &self.listen_endpoint {
- *endpoint.write().await = resolved_endpoint.clone();
- }
+ pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
+ let resolved_endpoint = Endpoint::Udp(
+ resolved_endpoint.addr()?.clone(),
+ self.config.discovery_port,
+ );
+ *self.listen_endpoint.write() = Some(resolved_endpoint);
+ Ok(())
}
/// Shuts down the refresh service