diff options
-rw-r--r-- | core/src/async_runtime/executor.rs | 8 | ||||
-rw-r--r-- | core/src/async_util/condvar.rs | 208 | ||||
-rw-r--r-- | core/src/async_util/condwait.rs | 52 | ||||
-rw-r--r-- | jsonrpc/examples/pubsub_client.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/client/mod.rs | 26 | ||||
-rw-r--r-- | jsonrpc/src/error.rs | 8 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 97 | ||||
-rw-r--r-- | p2p/examples/chat.rs | 42 | ||||
-rw-r--r-- | p2p/src/connector.rs | 19 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 12 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 70 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 45 | ||||
-rw-r--r-- | p2p/src/listener.rs | 27 | ||||
-rw-r--r-- | p2p/src/peer/mod.rs | 19 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 27 | ||||
-rw-r--r-- | p2p/src/protocols/ping.rs | 6 |
16 files changed, 367 insertions, 301 deletions
diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs index 88f6370..e0b707b 100644 --- a/core/src/async_runtime/executor.rs +++ b/core/src/async_runtime/executor.rs @@ -59,11 +59,13 @@ pub fn global_executor() -> Executor { #[cfg(feature = "tokio")] fn init_executor() -> Executor { let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime")); - let ex_cloned = ex.clone(); thread::Builder::new() .name("tokio-executor".to_string()) - .spawn(move || { - catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok(); + .spawn({ + let ex = ex.clone(); + move || { + catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok(); + } }) .expect("cannot spawn tokio runtime thread"); Executor { inner: ex } diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs index 8385982..e425eda 100644 --- a/core/src/async_util/condvar.rs +++ b/core/src/async_util/condvar.rs @@ -24,29 +24,33 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16}; /// let val = Arc::new(Mutex::new(false)); /// let condvar = Arc::new(CondVar::new()); /// -/// let val_cloned = val.clone(); -/// let condvar_cloned = condvar.clone(); -/// spawn(async move { -/// let mut val = val_cloned.lock().await; +/// spawn({ +/// let val = val.clone(); +/// let condvar = condvar.clone(); +/// async move { +/// let mut val = val.lock().await; /// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } +/// // While the boolean flag is false, wait for a signal. +/// while !*val { +/// val = condvar.wait(val).await; +/// } /// -/// // ... +/// // ... +/// } /// }); /// -/// let condvar_cloned = condvar.clone(); -/// spawn(async move { -/// let mut val = val.lock().await; +/// spawn({ +/// let condvar = condvar.clone(); +/// async move { +/// let mut val = val.lock().await; /// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } +/// // While the boolean flag is false, wait for a signal. +/// while !*val { +/// val = condvar.wait(val).await; +/// } /// -/// // ... +/// // ... +/// } /// }); /// /// // Wake up all waiting tasks on this condvar @@ -253,50 +257,54 @@ mod tests { let condvar_full = Arc::new(CondVar::new()); let condvar_empty = Arc::new(CondVar::new()); - let queue_cloned = queue.clone(); - let condvar_full_cloned = condvar_full.clone(); - let condvar_empty_cloned = condvar_empty.clone(); + let _producer1 = spawn({ + let queue = queue.clone(); + let condvar_full = condvar_full.clone(); + let condvar_empty = condvar_empty.clone(); + async move { + for i in 1..number_of_tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar_full.wait(queue).await; + } - let _producer1 = spawn(async move { - for i in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; + queue.items.push_back(format!("task {i}")); - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_full_cloned.wait(queue).await; + // Wake up the consumer + condvar_empty.signal(); } - - queue.items.push_back(format!("task {i}")); - - // Wake up the consumer - condvar_empty_cloned.signal(); } }); - let queue_cloned = queue.clone(); let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - let consumer = spawn(async move { - for _ in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar_empty.wait(queue).await; - } - let _ = queue.items.pop_front().unwrap(); + let consumer = spawn({ + let queue = queue.clone(); + let task_consumed = task_consumed.clone(); + async move { + for _ in 1..number_of_tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-empty + while queue.is_empty() { + // Release queue mutex and sleep + queue = condvar_empty.wait(queue).await; + } + + let _ = queue.items.pop_front().unwrap(); - task_consumed_.fetch_add(1, Ordering::Relaxed); + task_consumed.fetch_add(1, Ordering::Relaxed); - // Do something + // Do something - // Wake up the producer - condvar_full.signal(); + // Wake up the producer + condvar_full.signal(); + } } }); @@ -314,70 +322,76 @@ mod tests { let queue = Arc::new(Mutex::new(Queue::new(5))); let condvar = Arc::new(CondVar::new()); - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer1 = spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } + let _producer1 = spawn({ + let queue = queue.clone(); + let condvar = condvar.clone(); + async move { + for i in 1..tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } - queue.items.push_back(format!("producer1: task {i}")); + queue.items.push_back(format!("producer1: task {i}")); - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } }); - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer2 = spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } + let _producer2 = spawn({ + let queue = queue.clone(); + let condvar = condvar.clone(); + async move { + for i in 1..tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; - queue.items.push_back(format!("producer2: task {i}")); + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } + + queue.items.push_back(format!("producer2: task {i}")); - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } }); - let queue_cloned = queue.clone(); let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - let consumer = spawn(async move { - for _ in 1..((tasks * 2) - 1) { - { - // Lock queue mutex - let mut queue = queue_cloned.lock().await; + let consumer = spawn({ + let queue = queue.clone(); + let task_consumed = task_consumed.clone(); + async move { + for _ in 1..((tasks * 2) - 1) { + { + // Lock queue mutex + let mut queue = queue.lock().await; - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar.wait(queue).await; - } + // Check if the queue is non-empty + while queue.is_empty() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } - let _ = queue.items.pop_front().unwrap(); + let _ = queue.items.pop_front().unwrap(); - task_consumed_.fetch_add(1, Ordering::Relaxed); + task_consumed.fetch_add(1, Ordering::Relaxed); - // Do something + // Do something - // Wake up all producer and consumer tasks - condvar.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } } }); diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs index 76c6a05..b96d979 100644 --- a/core/src/async_util/condwait.rs +++ b/core/src/async_util/condwait.rs @@ -13,10 +13,12 @@ use crate::async_runtime::lock::Mutex; /// /// async { /// let cond_wait = Arc::new(CondWait::new()); -/// let cond_wait_cloned = cond_wait.clone(); -/// let task = spawn(async move { -/// cond_wait_cloned.wait().await; -/// // ... +/// let task = spawn({ +/// let cond_wait = cond_wait.clone(); +/// async move { +/// cond_wait.wait().await; +/// // ... +/// } /// }); /// /// cond_wait.signal().await; @@ -91,12 +93,14 @@ mod tests { let cond_wait = Arc::new(CondWait::new()); let count = Arc::new(AtomicUsize::new(0)); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); // Send a signal to the waiting task @@ -109,20 +113,24 @@ mod tests { assert_eq!(count.load(Ordering::Relaxed), 1); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task1 = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task1 = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task2 = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task2 = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); // Broadcast a signal to all waiting tasks diff --git a/jsonrpc/examples/pubsub_client.rs b/jsonrpc/examples/pubsub_client.rs index 823089d..bb3ce4c 100644 --- a/jsonrpc/examples/pubsub_client.rs +++ b/jsonrpc/examples/pubsub_client.rs @@ -16,8 +16,6 @@ async fn run_client() { .await .expect("Build a client"); - smol::spawn(async move {}).detach(); - let sub = client .subscribe("Calc.log_subscribe", ()) .await diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index 43ce5fc..6f6bd97 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -260,20 +260,26 @@ impl Client { } fn start_background_loop(self: &Arc<Self>, conn: Conn<serde_json::Value>) { - let selfc = self.clone(); - let on_complete = |result: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = result { - error!("Client stopped: {err}"); + let on_complete = { + let this = self.clone(); + |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Client stopped: {err}"); + } + this.disconnect.store(true, Ordering::Relaxed); + this.subscriptions.clear().await; + this.message_dispatcher.clear().await; } - selfc.disconnect.store(true, Ordering::Relaxed); - selfc.subscriptions.clear().await; - selfc.message_dispatcher.clear().await; }; - let selfc = self.clone(); // Spawn a new task - self.task_group - .spawn(selfc.background_loop(conn), on_complete); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.background_loop(conn).await } + }, + on_complete, + ); } async fn background_loop(self: Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> { diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs index 1b38519..48d0139 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/src/error.rs @@ -45,7 +45,7 @@ pub enum Error { ChannelRecv(#[from] async_channel::RecvError), #[error("Channel send Error: {0}")] - ChannelSend(&'static str), + ChannelSend(String), #[error("Unexpected Error: {0}")] General(&'static str), @@ -59,7 +59,7 @@ pub enum Error { impl<T> From<async_channel::SendError<T>> for Error { fn from(error: async_channel::SendError<T>) -> Self { - Error::ChannelSend(error.to_string().leak()) + Error::ChannelSend(error.to_string()) } } @@ -78,7 +78,7 @@ pub enum RPCError { InvalidRequest(&'static str), #[error("Parse Error: {0}")] - ParseError(&'static str), + ParseError(String), #[error("Internal Error")] InternalError, @@ -86,6 +86,6 @@ pub enum RPCError { impl From<serde_json::Error> for RPCError { fn from(error: serde_json::Error) -> Self { - RPCError::ParseError(error.to_string().leak()) + RPCError::ParseError(error.to_string()) } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 8d5cd2c..9d1f5b7 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -77,19 +77,21 @@ impl Server { } }; - let selfc = self.clone(); // Spawns a new task for each new incoming connection self.task_group.spawn( - async move { - loop { - match selfc.listener.accept().await { - Ok(conn) => { - if let Err(err) = selfc.handle_conn(conn).await { - error!("Handle a new connection: {err}") + { + let this = self.clone(); + async move { + loop { + match this.listener.accept().await { + Ok(conn) => { + if let Err(err) = this.handle_conn(conn).await { + error!("Handle a new connection: {err}") + } + } + Err(err) => { + error!("Accept a new connection: {err}") } - } - Err(err) => { - error!("Accept a new connection: {err}") } } } @@ -126,31 +128,33 @@ impl Server { chan.close(); }; - let conn_cloned = conn.clone(); - let queue_cloned = queue.clone(); // Start listening for new responses in the queue or new notifications self.task_group.spawn( - async move { - loop { - // The select function will prioritize the first future if both futures are ready. - // This gives priority to the responses in the response queue. - match select(queue_cloned.recv(), ch_rx.recv()).await { - Either::Left(res) => { - conn_cloned.send(res).await?; - } - Either::Right(notification) => { - let nt = notification?; - let params = Some(serde_json::json!(message::NotificationResult { - subscription: nt.sub_id, - result: Some(nt.result), - })); - let notification = message::Notification { - jsonrpc: message::JSONRPC_VERSION.to_string(), - method: nt.method, - params, - }; - debug!("--> {notification}"); - conn_cloned.send(serde_json::json!(notification)).await?; + { + let conn = conn.clone(); + let queue = queue.clone(); + async move { + loop { + // The select function will prioritize the first future if both futures are ready. + // This gives priority to the responses in the response queue. + match select(queue.recv(), ch_rx.recv()).await { + Either::Left(res) => { + conn.send(res).await?; + } + Either::Right(notification) => { + let nt = notification?; + let params = Some(serde_json::json!(message::NotificationResult { + subscription: nt.sub_id, + result: Some(nt.result), + })); + let notification = message::Notification { + jsonrpc: message::JSONRPC_VERSION.to_string(), + method: nt.method, + params, + }; + debug!("--> {notification}"); + conn.send(serde_json::json!(notification)).await?; + } } } } @@ -169,13 +173,15 @@ impl Server { chan.close(); }; - let selfc = self.clone(); // Spawn a new task and wait for new requests. self.task_group.spawn( - async move { - loop { - let msg = conn.recv().await?; - selfc.new_request(queue.clone(), channel.clone(), msg).await; + { + let this = self.clone(); + async move { + loop { + let msg = conn.recv().await?; + this.new_request(queue.clone(), channel.clone(), msg).await; + } } }, on_complete, @@ -254,15 +260,18 @@ impl Server { error!("Handle a new request: {err}"); } }; - let selfc = self.clone(); + // Spawns a new task for handling the new request, and push the // response to the response queue. self.task_group.spawn( - async move { - let response = selfc.handle_request(channel, msg).await; - debug!("--> {response}"); - queue.push(serde_json::json!(response)).await; - Ok(()) + { + let this = self.clone(); + async move { + let response = this.handle_request(channel, msg).await; + debug!("--> {response}"); + queue.push(serde_json::json!(response)).await; + Ok(()) + } }, on_complete, ); diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 4dd87c5..1ad215c 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -59,14 +59,16 @@ impl ChatProtocol { #[async_trait] impl Protocol for ChatProtocol { async fn start(self: Arc<Self>) -> Result<(), Error> { - let selfc = self.clone(); let stdin = io::stdin(); - let task = self.executor.spawn(async move { - loop { - let mut input = String::new(); - stdin.read_line(&mut input).await.unwrap(); - let msg = format!("> {}: {}", selfc.username, input.trim()); - selfc.peer.broadcast(&Self::id(), &msg).await; + let task = self.executor.spawn({ + let this = self.clone(); + async move { + loop { + let mut input = String::new(); + stdin.read_line(&mut input).await.unwrap(); + let msg = format!("> {}: {}", this.username, input.trim()); + this.peer.broadcast(&Self::id(), &msg).await; + } } }); @@ -126,23 +128,25 @@ fn main() { let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let ex_cloned = ex.clone(); run_executor( - async { - let username = cli.username; + { + let ex = ex.clone(); + async { + let username = cli.username; - // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone().into()); - backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); + // Attach the ChatProtocol + let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into()); + backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); - // Run the backend - backend.run().await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; + // Shutdown the backend + backend.shutdown().await; + } }, ex, ); diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index a44daea..cfa661b 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -127,17 +127,18 @@ impl Connector { { let conn = self.connect(endpoint, peer_id).await?; - let selfc = self.clone(); let endpoint = endpoint.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Outbound connection dropped: {err}"); + let on_disconnect = { + let this = self.clone(); + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + trace!("Outbound connection dropped: {err}"); + } + this.monitor + .notify(ConnEvent::Disconnected(endpoint.clone())) + .await; + this.connection_slots.remove().await; } - selfc - .monitor - .notify(ConnEvent::Disconnected(endpoint.clone())) - .await; - selfc.connection_slots.remove().await; }; self.task_group.spawn(callback(conn), on_disconnect); diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 8e06eef..9ddf614 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -283,11 +283,13 @@ impl LookupService { let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); - let selfc = self.clone(); - let callback = |conn: Conn<NetMsg>| async move { - let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); - timeout(t, selfc.handle_inbound(conn)).await??; - Ok(()) + let callback = { + let this = self.clone(); + |conn: Conn<NetMsg>| async move { + let t = Duration::from_secs(this.config.lookup_connection_lifespan); + timeout(t, this.handle_inbound(conn)).await??; + Ok(()) + } }; self.listener.start(endpoint.clone(), callback).await?; diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index dae4d3f..a9d99d6 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -154,13 +154,17 @@ impl Discovery { } // Start connect loop - let selfc = self.clone(); - self.task_group - .spawn(selfc.connect_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.connect_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -177,10 +181,12 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { - let selfc = self.clone(); - let callback = |c: Conn<NetMsg>| async move { - selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; - Ok(()) + let callback = { + let this = self.clone(); + |c: Conn<NetMsg>| async move { + this.conn_queue.handle(c, ConnDirection::Inbound).await?; + Ok(()) + } }; let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; @@ -212,31 +218,33 @@ impl Discovery { /// Connect to the given endpoint using the connector async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { - let selfc = self.clone(); - let pid_c = pid.clone(); - let endpoint_c = endpoint.clone(); - let cback = |conn: Conn<NetMsg>| async move { - let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - - // If the entry is not in the routing table, ignore the result - let pid = match pid_c { - Some(p) => p, - None => return Ok(()), - }; - - match result { - Err(Error::IncompatiblePeer) => { - error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); - } - Err(Error::PeerAlreadyConnected) => { - selfc.table.update_entry(&pid.0, CONNECTED_ENTRY) + let cback = { + let this = self.clone(); + let endpoint = endpoint.clone(); + let pid = pid.clone(); + |conn: Conn<NetMsg>| async move { + let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await; + + // If the entry is not in the routing table, ignore the result + let pid = match pid { + Some(p) => p, + None => return Ok(()), + }; + + match result { + Err(Error::IncompatiblePeer) => { + error!("Failed to do handshake: {endpoint} incompatible peer"); + this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY); + } + Err(Error::PeerAlreadyConnected) => { + this.table.update_entry(&pid.0, CONNECTED_ENTRY) + } + Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY), + Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY), } - Err(_) => selfc.table.update_entry(&pid.0, UNSTABLE_ENTRY), - Ok(_) => selfc.table.update_entry(&pid.0, DISCONNECTED_ENTRY), - } - Ok(()) + Ok(()) + } }; let result = self diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index c1d222b..b4f5396 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -38,9 +38,6 @@ pub struct RefreshService { /// Managing spawned tasks. task_group: TaskGroup, - /// A global executor - executor: Executor, - /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -65,7 +62,6 @@ impl RefreshService { table, listen_endpoint, task_group: TaskGroup::with_executor(executor.clone()), - executor, config, monitor, } @@ -74,24 +70,32 @@ impl RefreshService { /// Start the refresh service pub async fn start(self: &Arc<Self>) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await; + let endpoint = endpoint.read().await.clone(); - let selfc = self.clone(); - self.task_group - .spawn(selfc.listen_loop(endpoint.clone()), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop(endpoint).await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }); + }, + ); } - let selfc = self.clone(); - self.task_group - .spawn(selfc.refresh_loop(), |res| async move { + self.task_group.spawn( + { + let this = self.clone(); + async move { this.refresh_loop().await } + }, + |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }); + }, + ); Ok(()) } @@ -133,25 +137,22 @@ impl RefreshService { } } - /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt. + /// Iterates over the entries and initiates a connection. async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) { - let ex = &self.executor; - // Enforce a maximum of 16 concurrent connections. + use futures_util::stream::{FuturesUnordered, StreamExt}; + // Enforce a maximum of 16 connections. for chunk in entries.chunks(16) { - let mut tasks = Vec::new(); + let mut tasks = FuturesUnordered::new(); for bucket_entry in chunk { if bucket_entry.failures >= MAX_FAILURES { self.table.remove_entry(&bucket_entry.entry.key); continue; } - tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) + tasks.push(self.clone().refresh_entry(bucket_entry.clone())) } - for task in tasks { - let _ = task.await; - } + while tasks.next().await.is_some() {} } } diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 923eb18..8a5deaa 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -87,9 +87,13 @@ impl Listener { info!("Start listening on {resolved_endpoint}"); - let selfc = self.clone(); - self.task_group - .spawn(selfc.listen_loop(listener, callback), |_| async {}); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop(listener, callback).await } + }, + |_| async {}, + ); Ok(resolved_endpoint) } @@ -135,16 +139,15 @@ impl Listener { self.connection_slots.add(); - let selfc = self.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - debug!("Inbound connection dropped: {err}"); + let on_disconnect = { + let this = self.clone(); + |res| async move { + if let TaskResult::Completed(Err(err)) = res { + debug!("Inbound connection dropped: {err}"); + } + this.monitor.notify(ConnEvent::Disconnected(endpoint)).await; + this.connection_slots.remove().await; } - selfc - .monitor - .notify(ConnEvent::Disconnected(endpoint)) - .await; - selfc.connection_slots.remove().await; }; let callback = callback.clone(); diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 2068789..6903294 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -209,16 +209,17 @@ impl Peer { self.protocol_ids.write().await.push(protocol_id.clone()); - let selfc = self.clone(); - let proto_idc = protocol_id.clone(); - - let on_failure = |result: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(res) = result { - if res.is_err() { - error!("protocol {} stopped", proto_idc); + let on_failure = { + let this = self.clone(); + let protocol_id = protocol_id.clone(); + |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(res) = result { + if res.is_err() { + error!("protocol {} stopped", protocol_id); + } + // Send a stop signal to read loop + let _ = this.stop_chan.0.try_send(res); } - // Send a stop signal to read loop - let _ = selfc.stop_chan.0.try_send(res); } }; diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 02a74ac..1f3ca55 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -78,11 +78,16 @@ impl PeerPool { }) } - /// Start + /// Starts the [`PeerPool`] pub async fn start(self: &Arc<Self>) -> Result<()> { self.setup_protocols().await?; - let selfc = self.clone(); - self.task_group.spawn(selfc.listen_loop(), |_| async {}); + self.task_group.spawn( + { + let this = self.clone(); + async move { this.listen_loop().await } + }, + |_| async {}, + ); Ok(()) } @@ -145,14 +150,16 @@ impl PeerPool { // Insert the new peer self.peers.write().await.insert(pid.clone(), peer.clone()); - let selfc = self.clone(); - let pid_c = pid.clone(); - let on_disconnect = |result| async move { - if let TaskResult::Completed(result) = result { - if let Err(err) = selfc.remove_peer(&pid_c).await { - error!("Failed to remove peer {pid_c}: {err}"); + let on_disconnect = { + let this = self.clone(); + let pid = pid.clone(); + |result| async move { + if let TaskResult::Completed(result) = result { + if let Err(err) = this.remove_peer(&pid).await { + error!("Failed to remove peer {pid}: {err}"); + } + let _ = disconnect_signal.send(result).await; } - let _ = disconnect_signal.send(result).await; } }; diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index ef1b54e..b800b23 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -128,9 +128,11 @@ impl Protocol for PingProtocol { let (pong_chan, pong_chan_recv) = async_channel::bounded(1); let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1); - let selfc = self.clone(); self.task_group.spawn( - selfc.clone().ping_loop(pong_chan_recv.clone()), + { + let this = self.clone(); + async move { this.ping_loop(pong_chan_recv.clone()).await } + }, |res| async move { if let TaskResult::Completed(result) = res { let _ = stop_signal_s.send(result).await; |