aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/async_runtime/executor.rs8
-rw-r--r--core/src/async_util/condvar.rs208
-rw-r--r--core/src/async_util/condwait.rs52
-rw-r--r--jsonrpc/examples/pubsub_client.rs2
-rw-r--r--jsonrpc/src/client/mod.rs26
-rw-r--r--jsonrpc/src/error.rs8
-rw-r--r--jsonrpc/src/server/mod.rs97
-rw-r--r--p2p/examples/chat.rs42
-rw-r--r--p2p/src/connector.rs19
-rw-r--r--p2p/src/discovery/lookup.rs12
-rw-r--r--p2p/src/discovery/mod.rs70
-rw-r--r--p2p/src/discovery/refresh.rs45
-rw-r--r--p2p/src/listener.rs27
-rw-r--r--p2p/src/peer/mod.rs19
-rw-r--r--p2p/src/peer_pool.rs27
-rw-r--r--p2p/src/protocols/ping.rs6
16 files changed, 367 insertions, 301 deletions
diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs
index 88f6370..e0b707b 100644
--- a/core/src/async_runtime/executor.rs
+++ b/core/src/async_runtime/executor.rs
@@ -59,11 +59,13 @@ pub fn global_executor() -> Executor {
#[cfg(feature = "tokio")]
fn init_executor() -> Executor {
let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime"));
- let ex_cloned = ex.clone();
thread::Builder::new()
.name("tokio-executor".to_string())
- .spawn(move || {
- catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok();
+ .spawn({
+ let ex = ex.clone();
+ move || {
+ catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok();
+ }
})
.expect("cannot spawn tokio runtime thread");
Executor { inner: ex }
diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs
index 8385982..e425eda 100644
--- a/core/src/async_util/condvar.rs
+++ b/core/src/async_util/condvar.rs
@@ -24,29 +24,33 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16};
/// let val = Arc::new(Mutex::new(false));
/// let condvar = Arc::new(CondVar::new());
///
-/// let val_cloned = val.clone();
-/// let condvar_cloned = condvar.clone();
-/// spawn(async move {
-/// let mut val = val_cloned.lock().await;
+/// spawn({
+/// let val = val.clone();
+/// let condvar = condvar.clone();
+/// async move {
+/// let mut val = val.lock().await;
///
-/// // While the boolean flag is false, wait for a signal.
-/// while !*val {
-/// val = condvar_cloned.wait(val).await;
-/// }
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar.wait(val).await;
+/// }
///
-/// // ...
+/// // ...
+/// }
/// });
///
-/// let condvar_cloned = condvar.clone();
-/// spawn(async move {
-/// let mut val = val.lock().await;
+/// spawn({
+/// let condvar = condvar.clone();
+/// async move {
+/// let mut val = val.lock().await;
///
-/// // While the boolean flag is false, wait for a signal.
-/// while !*val {
-/// val = condvar_cloned.wait(val).await;
-/// }
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar.wait(val).await;
+/// }
///
-/// // ...
+/// // ...
+/// }
/// });
///
/// // Wake up all waiting tasks on this condvar
@@ -253,50 +257,54 @@ mod tests {
let condvar_full = Arc::new(CondVar::new());
let condvar_empty = Arc::new(CondVar::new());
- let queue_cloned = queue.clone();
- let condvar_full_cloned = condvar_full.clone();
- let condvar_empty_cloned = condvar_empty.clone();
+ let _producer1 = spawn({
+ let queue = queue.clone();
+ let condvar_full = condvar_full.clone();
+ let condvar_empty = condvar_empty.clone();
+ async move {
+ for i in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar_full.wait(queue).await;
+ }
- let _producer1 = spawn(async move {
- for i in 1..number_of_tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
+ queue.items.push_back(format!("task {i}"));
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_full_cloned.wait(queue).await;
+ // Wake up the consumer
+ condvar_empty.signal();
}
-
- queue.items.push_back(format!("task {i}"));
-
- // Wake up the consumer
- condvar_empty_cloned.signal();
}
});
- let queue_cloned = queue.clone();
let task_consumed = Arc::new(AtomicUsize::new(0));
- let task_consumed_ = task_consumed.clone();
- let consumer = spawn(async move {
- for _ in 1..number_of_tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-empty
- while queue.is_empty() {
- // Release queue mutex and sleep
- queue = condvar_empty.wait(queue).await;
- }
- let _ = queue.items.pop_front().unwrap();
+ let consumer = spawn({
+ let queue = queue.clone();
+ let task_consumed = task_consumed.clone();
+ async move {
+ for _ in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar_empty.wait(queue).await;
+ }
+
+ let _ = queue.items.pop_front().unwrap();
- task_consumed_.fetch_add(1, Ordering::Relaxed);
+ task_consumed.fetch_add(1, Ordering::Relaxed);
- // Do something
+ // Do something
- // Wake up the producer
- condvar_full.signal();
+ // Wake up the producer
+ condvar_full.signal();
+ }
}
});
@@ -314,70 +322,76 @@ mod tests {
let queue = Arc::new(Mutex::new(Queue::new(5)));
let condvar = Arc::new(CondVar::new());
- let queue_cloned = queue.clone();
- let condvar_cloned = condvar.clone();
- let _producer1 = spawn(async move {
- for i in 1..tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_cloned.wait(queue).await;
- }
+ let _producer1 = spawn({
+ let queue = queue.clone();
+ let condvar = condvar.clone();
+ async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
- queue.items.push_back(format!("producer1: task {i}"));
+ queue.items.push_back(format!("producer1: task {i}"));
- // Wake up all producer and consumer tasks
- condvar_cloned.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
});
- let queue_cloned = queue.clone();
- let condvar_cloned = condvar.clone();
- let _producer2 = spawn(async move {
- for i in 1..tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_cloned.wait(queue).await;
- }
+ let _producer2 = spawn({
+ let queue = queue.clone();
+ let condvar = condvar.clone();
+ async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
- queue.items.push_back(format!("producer2: task {i}"));
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
+
+ queue.items.push_back(format!("producer2: task {i}"));
- // Wake up all producer and consumer tasks
- condvar_cloned.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
});
- let queue_cloned = queue.clone();
let task_consumed = Arc::new(AtomicUsize::new(0));
- let task_consumed_ = task_consumed.clone();
- let consumer = spawn(async move {
- for _ in 1..((tasks * 2) - 1) {
- {
- // Lock queue mutex
- let mut queue = queue_cloned.lock().await;
+ let consumer = spawn({
+ let queue = queue.clone();
+ let task_consumed = task_consumed.clone();
+ async move {
+ for _ in 1..((tasks * 2) - 1) {
+ {
+ // Lock queue mutex
+ let mut queue = queue.lock().await;
- // Check if the queue is non-empty
- while queue.is_empty() {
- // Release queue mutex and sleep
- queue = condvar.wait(queue).await;
- }
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
- let _ = queue.items.pop_front().unwrap();
+ let _ = queue.items.pop_front().unwrap();
- task_consumed_.fetch_add(1, Ordering::Relaxed);
+ task_consumed.fetch_add(1, Ordering::Relaxed);
- // Do something
+ // Do something
- // Wake up all producer and consumer tasks
- condvar.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
}
});
diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs
index 76c6a05..b96d979 100644
--- a/core/src/async_util/condwait.rs
+++ b/core/src/async_util/condwait.rs
@@ -13,10 +13,12 @@ use crate::async_runtime::lock::Mutex;
///
/// async {
/// let cond_wait = Arc::new(CondWait::new());
-/// let cond_wait_cloned = cond_wait.clone();
-/// let task = spawn(async move {
-/// cond_wait_cloned.wait().await;
-/// // ...
+/// let task = spawn({
+/// let cond_wait = cond_wait.clone();
+/// async move {
+/// cond_wait.wait().await;
+/// // ...
+/// }
/// });
///
/// cond_wait.signal().await;
@@ -91,12 +93,14 @@ mod tests {
let cond_wait = Arc::new(CondWait::new());
let count = Arc::new(AtomicUsize::new(0));
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
// Send a signal to the waiting task
@@ -109,20 +113,24 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1);
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task1 = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task1 = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task2 = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task2 = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
// Broadcast a signal to all waiting tasks
diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs
index 823089d..bb3ce4c 100644
--- a/jsonrpc/examples/pubsub_client.rs
+++ b/jsonrpc/examples/pubsub_client.rs
@@ -16,8 +16,6 @@ async fn run_client() {
.await
.expect("Build a client");
- smol::spawn(async move {}).detach();
-
let sub = client
.subscribe("Calc.log_subscribe", ())
.await
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index 43ce5fc..6f6bd97 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -260,20 +260,26 @@ impl Client {
}
fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) {
- let selfc = self.clone();
- let on_complete = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Client stopped: {err}");
+ let on_complete = {
+ let this = self.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Client stopped: {err}");
+ }
+ this.disconnect.store(true, Ordering::Relaxed);
+ this.subscriptions.clear().await;
+ this.message_dispatcher.clear().await;
}
- selfc.disconnect.store(true, Ordering::Relaxed);
- selfc.subscriptions.clear().await;
- selfc.message_dispatcher.clear().await;
};
- let selfc = self.clone();
// Spawn a new task
- self.task_group
- .spawn(selfc.background_loop(conn), on_complete);
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.background_loop(conn).await }
+ },
+ on_complete,
+ );
}
async fn background_loop(self: Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> {
diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs
index 1b38519..48d0139 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/src/error.rs
@@ -45,7 +45,7 @@ pub enum Error {
ChannelRecv(#[from] async_channel::RecvError),
#[error("Channel send Error: {0}")]
- ChannelSend(&'static str),
+ ChannelSend(String),
#[error("Unexpected Error: {0}")]
General(&'static str),
@@ -59,7 +59,7 @@ pub enum Error {
impl<T> From<async_channel::SendError<T>> for Error {
fn from(error: async_channel::SendError<T>) -> Self {
- Error::ChannelSend(error.to_string().leak())
+ Error::ChannelSend(error.to_string())
}
}
@@ -78,7 +78,7 @@ pub enum RPCError {
InvalidRequest(&'static str),
#[error("Parse Error: {0}")]
- ParseError(&'static str),
+ ParseError(String),
#[error("Internal Error")]
InternalError,
@@ -86,6 +86,6 @@ pub enum RPCError {
impl From<serde_json::Error> for RPCError {
fn from(error: serde_json::Error) -> Self {
- RPCError::ParseError(error.to_string().leak())
+ RPCError::ParseError(error.to_string())
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 8d5cd2c..9d1f5b7 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -77,19 +77,21 @@ impl Server {
}
};
- let selfc = self.clone();
// Spawns a new task for each new incoming connection
self.task_group.spawn(
- async move {
- loop {
- match selfc.listener.accept().await {
- Ok(conn) => {
- if let Err(err) = selfc.handle_conn(conn).await {
- error!("Handle a new connection: {err}")
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ match this.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = this.handle_conn(conn).await {
+ error!("Handle a new connection: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Accept a new connection: {err}")
}
- }
- Err(err) => {
- error!("Accept a new connection: {err}")
}
}
}
@@ -126,31 +128,33 @@ impl Server {
chan.close();
};
- let conn_cloned = conn.clone();
- let queue_cloned = queue.clone();
// Start listening for new responses in the queue or new notifications
self.task_group.spawn(
- async move {
- loop {
- // The select function will prioritize the first future if both futures are ready.
- // This gives priority to the responses in the response queue.
- match select(queue_cloned.recv(), ch_rx.recv()).await {
- Either::Left(res) => {
- conn_cloned.send(res).await?;
- }
- Either::Right(notification) => {
- let nt = notification?;
- let params = Some(serde_json::json!(message::NotificationResult {
- subscription: nt.sub_id,
- result: Some(nt.result),
- }));
- let notification = message::Notification {
- jsonrpc: message::JSONRPC_VERSION.to_string(),
- method: nt.method,
- params,
- };
- debug!("--> {notification}");
- conn_cloned.send(serde_json::json!(notification)).await?;
+ {
+ let conn = conn.clone();
+ let queue = queue.clone();
+ async move {
+ loop {
+ // The select function will prioritize the first future if both futures are ready.
+ // This gives priority to the responses in the response queue.
+ match select(queue.recv(), ch_rx.recv()).await {
+ Either::Left(res) => {
+ conn.send(res).await?;
+ }
+ Either::Right(notification) => {
+ let nt = notification?;
+ let params = Some(serde_json::json!(message::NotificationResult {
+ subscription: nt.sub_id,
+ result: Some(nt.result),
+ }));
+ let notification = message::Notification {
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
+ method: nt.method,
+ params,
+ };
+ debug!("--> {notification}");
+ conn.send(serde_json::json!(notification)).await?;
+ }
}
}
}
@@ -169,13 +173,15 @@ impl Server {
chan.close();
};
- let selfc = self.clone();
// Spawn a new task and wait for new requests.
self.task_group.spawn(
- async move {
- loop {
- let msg = conn.recv().await?;
- selfc.new_request(queue.clone(), channel.clone(), msg).await;
+ {
+ let this = self.clone();
+ async move {
+ loop {
+ let msg = conn.recv().await?;
+ this.new_request(queue.clone(), channel.clone(), msg).await;
+ }
}
},
on_complete,
@@ -254,15 +260,18 @@ impl Server {
error!("Handle a new request: {err}");
}
};
- let selfc = self.clone();
+
// Spawns a new task for handling the new request, and push the
// response to the response queue.
self.task_group.spawn(
- async move {
- let response = selfc.handle_request(channel, msg).await;
- debug!("--> {response}");
- queue.push(serde_json::json!(response)).await;
- Ok(())
+ {
+ let this = self.clone();
+ async move {
+ let response = this.handle_request(channel, msg).await;
+ debug!("--> {response}");
+ queue.push(serde_json::json!(response)).await;
+ Ok(())
+ }
},
on_complete,
);
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 4dd87c5..1ad215c 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -59,14 +59,16 @@ impl ChatProtocol {
#[async_trait]
impl Protocol for ChatProtocol {
async fn start(self: Arc<Self>) -> Result<(), Error> {
- let selfc = self.clone();
let stdin = io::stdin();
- let task = self.executor.spawn(async move {
- loop {
- let mut input = String::new();
- stdin.read_line(&mut input).await.unwrap();
- let msg = format!("> {}: {}", selfc.username, input.trim());
- selfc.peer.broadcast(&Self::id(), &msg).await;
+ let task = self.executor.spawn({
+ let this = self.clone();
+ async move {
+ loop {
+ let mut input = String::new();
+ stdin.read_line(&mut input).await.unwrap();
+ let msg = format!("> {}: {}", this.username, input.trim());
+ this.peer.broadcast(&Self::id(), &msg).await;
+ }
}
});
@@ -126,23 +128,25 @@ fn main() {
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let ex_cloned = ex.clone();
run_executor(
- async {
- let username = cli.username;
+ {
+ let ex = ex.clone();
+ async {
+ let username = cli.username;
- // Attach the ChatProtocol
- let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone().into());
- backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+ // Attach the ChatProtocol
+ let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into());
+ backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
- // Run the backend
- backend.run().await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
+ // Shutdown the backend
+ backend.shutdown().await;
+ }
},
ex,
);
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index a44daea..cfa661b 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -127,17 +127,18 @@ impl Connector {
{
let conn = self.connect(endpoint, peer_id).await?;
- let selfc = self.clone();
let endpoint = endpoint.clone();
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- trace!("Outbound connection dropped: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ trace!("Outbound connection dropped: {err}");
+ }
+ this.monitor
+ .notify(ConnEvent::Disconnected(endpoint.clone()))
+ .await;
+ this.connection_slots.remove().await;
}
- selfc
- .monitor
- .notify(ConnEvent::Disconnected(endpoint.clone()))
- .await;
- selfc.connection_slots.remove().await;
};
self.task_group.spawn(callback(conn), on_disconnect);
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 8e06eef..9ddf614 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -283,11 +283,13 @@ impl LookupService {
let endpoint = Endpoint::Tcp(addr, self.config.discovery_port);
- let selfc = self.clone();
- let callback = |conn: Conn<NetMsg>| async move {
- let t = Duration::from_secs(selfc.config.lookup_connection_lifespan);
- timeout(t, selfc.handle_inbound(conn)).await??;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |conn: Conn<NetMsg>| async move {
+ let t = Duration::from_secs(this.config.lookup_connection_lifespan);
+ timeout(t, this.handle_inbound(conn)).await??;
+ Ok(())
+ }
};
self.listener.start(endpoint.clone(), callback).await?;
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index dae4d3f..a9d99d6 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -154,13 +154,17 @@ impl Discovery {
}
// Start connect loop
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.connect_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.connect_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -177,10 +181,12 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
- let selfc = self.clone();
- let callback = |c: Conn<NetMsg>| async move {
- selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
- Ok(())
+ let callback = {
+ let this = self.clone();
+ |c: Conn<NetMsg>| async move {
+ this.conn_queue.handle(c, ConnDirection::Inbound).await?;
+ Ok(())
+ }
};
let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
@@ -212,31 +218,33 @@ impl Discovery {
/// Connect to the given endpoint using the connector
async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
- let selfc = self.clone();
- let pid_c = pid.clone();
- let endpoint_c = endpoint.clone();
- let cback = |conn: Conn<NetMsg>| async move {
- let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
-
- // If the entry is not in the routing table, ignore the result
- let pid = match pid_c {
- Some(p) => p,
- None => return Ok(()),
- };
-
- match result {
- Err(Error::IncompatiblePeer) => {
- error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
- }
- Err(Error::PeerAlreadyConnected) => {
- selfc.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ let cback = {
+ let this = self.clone();
+ let endpoint = endpoint.clone();
+ let pid = pid.clone();
+ |conn: Conn<NetMsg>| async move {
+ let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
+
+ // If the entry is not in the routing table, ignore the result
+ let pid = match pid {
+ Some(p) => p,
+ None => return Ok(()),
+ };
+
+ match result {
+ Err(Error::IncompatiblePeer) => {
+ error!("Failed to do handshake: {endpoint} incompatible peer");
+ this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
+ }
+ Err(Error::PeerAlreadyConnected) => {
+ this.table.update_entry(&pid.0, CONNECTED_ENTRY)
+ }
+ Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
+ Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
}
- Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY),
- Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
- }
- Ok(())
+ Ok(())
+ }
};
let result = self
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index c1d222b..b4f5396 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -38,9 +38,6 @@ pub struct RefreshService {
/// Managing spawned tasks.
task_group: TaskGroup,
- /// A global executor
- executor: Executor,
-
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -65,7 +62,6 @@ impl RefreshService {
table,
listen_endpoint,
task_group: TaskGroup::with_executor(executor.clone()),
- executor,
config,
monitor,
}
@@ -74,24 +70,32 @@ impl RefreshService {
/// Start the refresh service
pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
- let endpoint = endpoint.read().await;
+ let endpoint = endpoint.read().await.clone();
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(endpoint.clone()), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(endpoint).await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- });
+ },
+ );
}
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.refresh_loop(), |res| async move {
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.refresh_loop().await }
+ },
+ |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- });
+ },
+ );
Ok(())
}
@@ -133,25 +137,22 @@ impl RefreshService {
}
}
- /// Iterates over the entries and spawns a new task for each entry to
- /// initiate a connection attempt.
+ /// Iterates over the entries and initiates a connection.
async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
- let ex = &self.executor;
- // Enforce a maximum of 16 concurrent connections.
+ use futures_util::stream::{FuturesUnordered, StreamExt};
+ // Enforce a maximum of 16 connections.
for chunk in entries.chunks(16) {
- let mut tasks = Vec::new();
+ let mut tasks = FuturesUnordered::new();
for bucket_entry in chunk {
if bucket_entry.failures >= MAX_FAILURES {
self.table.remove_entry(&bucket_entry.entry.key);
continue;
}
- tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone())))
+ tasks.push(self.clone().refresh_entry(bucket_entry.clone()))
}
- for task in tasks {
- let _ = task.await;
- }
+ while tasks.next().await.is_some() {}
}
}
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 923eb18..8a5deaa 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -87,9 +87,13 @@ impl Listener {
info!("Start listening on {resolved_endpoint}");
- let selfc = self.clone();
- self.task_group
- .spawn(selfc.listen_loop(listener, callback), |_| async {});
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop(listener, callback).await }
+ },
+ |_| async {},
+ );
Ok(resolved_endpoint)
}
@@ -135,16 +139,15 @@ impl Listener {
self.connection_slots.add();
- let selfc = self.clone();
- let on_disconnect = |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- debug!("Inbound connection dropped: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ |res| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ debug!("Inbound connection dropped: {err}");
+ }
+ this.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
+ this.connection_slots.remove().await;
}
- selfc
- .monitor
- .notify(ConnEvent::Disconnected(endpoint))
- .await;
- selfc.connection_slots.remove().await;
};
let callback = callback.clone();
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 2068789..6903294 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -209,16 +209,17 @@ impl Peer {
self.protocol_ids.write().await.push(protocol_id.clone());
- let selfc = self.clone();
- let proto_idc = protocol_id.clone();
-
- let on_failure = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(res) = result {
- if res.is_err() {
- error!("protocol {} stopped", proto_idc);
+ let on_failure = {
+ let this = self.clone();
+ let protocol_id = protocol_id.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(res) = result {
+ if res.is_err() {
+ error!("protocol {} stopped", protocol_id);
+ }
+ // Send a stop signal to read loop
+ let _ = this.stop_chan.0.try_send(res);
}
- // Send a stop signal to read loop
- let _ = selfc.stop_chan.0.try_send(res);
}
};
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 02a74ac..1f3ca55 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -78,11 +78,16 @@ impl PeerPool {
})
}
- /// Start
+ /// Starts the [`PeerPool`]
pub async fn start(self: &Arc<Self>) -> Result<()> {
self.setup_protocols().await?;
- let selfc = self.clone();
- self.task_group.spawn(selfc.listen_loop(), |_| async {});
+ self.task_group.spawn(
+ {
+ let this = self.clone();
+ async move { this.listen_loop().await }
+ },
+ |_| async {},
+ );
Ok(())
}
@@ -145,14 +150,16 @@ impl PeerPool {
// Insert the new peer
self.peers.write().await.insert(pid.clone(), peer.clone());
- let selfc = self.clone();
- let pid_c = pid.clone();
- let on_disconnect = |result| async move {
- if let TaskResult::Completed(result) = result {
- if let Err(err) = selfc.remove_peer(&pid_c).await {
- error!("Failed to remove peer {pid_c}: {err}");
+ let on_disconnect = {
+ let this = self.clone();
+ let pid = pid.clone();
+ |result| async move {
+ if let TaskResult::Completed(result) = result {
+ if let Err(err) = this.remove_peer(&pid).await {
+ error!("Failed to remove peer {pid}: {err}");
+ }
+ let _ = disconnect_signal.send(result).await;
}
- let _ = disconnect_signal.send(result).await;
}
};
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index ef1b54e..b800b23 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -128,9 +128,11 @@ impl Protocol for PingProtocol {
let (pong_chan, pong_chan_recv) = async_channel::bounded(1);
let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1);
- let selfc = self.clone();
self.task_group.spawn(
- selfc.clone().ping_loop(pong_chan_recv.clone()),
+ {
+ let this = self.clone();
+ async move { this.ping_loop(pong_chan_recv.clone()).await }
+ },
|res| async move {
if let TaskResult::Completed(result) = res {
let _ = stop_signal_s.send(result).await;