From 5c0abab1b7bf0f2858c451d6f0efc7ca0e138fc6 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 29 Jun 2024 21:16:46 +0200 Subject: use shadown variables to name clones and place them between {} when spawning new tasks --- core/src/async_runtime/executor.rs | 8 +- core/src/async_util/condvar.rs | 208 ++++++++++++++++++++----------------- core/src/async_util/condwait.rs | 52 ++++++---- 3 files changed, 146 insertions(+), 122 deletions(-) (limited to 'core') diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs index 88f6370..e0b707b 100644 --- a/core/src/async_runtime/executor.rs +++ b/core/src/async_runtime/executor.rs @@ -59,11 +59,13 @@ pub fn global_executor() -> Executor { #[cfg(feature = "tokio")] fn init_executor() -> Executor { let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime")); - let ex_cloned = ex.clone(); thread::Builder::new() .name("tokio-executor".to_string()) - .spawn(move || { - catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok(); + .spawn({ + let ex = ex.clone(); + move || { + catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok(); + } }) .expect("cannot spawn tokio runtime thread"); Executor { inner: ex } diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs index 8385982..e425eda 100644 --- a/core/src/async_util/condvar.rs +++ b/core/src/async_util/condvar.rs @@ -24,29 +24,33 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16}; /// let val = Arc::new(Mutex::new(false)); /// let condvar = Arc::new(CondVar::new()); /// -/// let val_cloned = val.clone(); -/// let condvar_cloned = condvar.clone(); -/// spawn(async move { -/// let mut val = val_cloned.lock().await; +/// spawn({ +/// let val = val.clone(); +/// let condvar = condvar.clone(); +/// async move { +/// let mut val = val.lock().await; /// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } +/// // While the boolean flag is false, wait for a signal. +/// while !*val { +/// val = condvar.wait(val).await; +/// } /// -/// // ... +/// // ... +/// } /// }); /// -/// let condvar_cloned = condvar.clone(); -/// spawn(async move { -/// let mut val = val.lock().await; +/// spawn({ +/// let condvar = condvar.clone(); +/// async move { +/// let mut val = val.lock().await; /// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } +/// // While the boolean flag is false, wait for a signal. +/// while !*val { +/// val = condvar.wait(val).await; +/// } /// -/// // ... +/// // ... +/// } /// }); /// /// // Wake up all waiting tasks on this condvar @@ -253,50 +257,54 @@ mod tests { let condvar_full = Arc::new(CondVar::new()); let condvar_empty = Arc::new(CondVar::new()); - let queue_cloned = queue.clone(); - let condvar_full_cloned = condvar_full.clone(); - let condvar_empty_cloned = condvar_empty.clone(); + let _producer1 = spawn({ + let queue = queue.clone(); + let condvar_full = condvar_full.clone(); + let condvar_empty = condvar_empty.clone(); + async move { + for i in 1..number_of_tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar_full.wait(queue).await; + } - let _producer1 = spawn(async move { - for i in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; + queue.items.push_back(format!("task {i}")); - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_full_cloned.wait(queue).await; + // Wake up the consumer + condvar_empty.signal(); } - - queue.items.push_back(format!("task {i}")); - - // Wake up the consumer - condvar_empty_cloned.signal(); } }); - let queue_cloned = queue.clone(); let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - let consumer = spawn(async move { - for _ in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar_empty.wait(queue).await; - } - let _ = queue.items.pop_front().unwrap(); + let consumer = spawn({ + let queue = queue.clone(); + let task_consumed = task_consumed.clone(); + async move { + for _ in 1..number_of_tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-empty + while queue.is_empty() { + // Release queue mutex and sleep + queue = condvar_empty.wait(queue).await; + } + + let _ = queue.items.pop_front().unwrap(); - task_consumed_.fetch_add(1, Ordering::Relaxed); + task_consumed.fetch_add(1, Ordering::Relaxed); - // Do something + // Do something - // Wake up the producer - condvar_full.signal(); + // Wake up the producer + condvar_full.signal(); + } } }); @@ -314,70 +322,76 @@ mod tests { let queue = Arc::new(Mutex::new(Queue::new(5))); let condvar = Arc::new(CondVar::new()); - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer1 = spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } + let _producer1 = spawn({ + let queue = queue.clone(); + let condvar = condvar.clone(); + async move { + for i in 1..tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; + + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } - queue.items.push_back(format!("producer1: task {i}")); + queue.items.push_back(format!("producer1: task {i}")); - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } }); - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer2 = spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } + let _producer2 = spawn({ + let queue = queue.clone(); + let condvar = condvar.clone(); + async move { + for i in 1..tasks { + // Lock queue mtuex + let mut queue = queue.lock().await; - queue.items.push_back(format!("producer2: task {i}")); + // Check if the queue is non-full + while queue.is_full() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } + + queue.items.push_back(format!("producer2: task {i}")); - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } }); - let queue_cloned = queue.clone(); let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - let consumer = spawn(async move { - for _ in 1..((tasks * 2) - 1) { - { - // Lock queue mutex - let mut queue = queue_cloned.lock().await; + let consumer = spawn({ + let queue = queue.clone(); + let task_consumed = task_consumed.clone(); + async move { + for _ in 1..((tasks * 2) - 1) { + { + // Lock queue mutex + let mut queue = queue.lock().await; - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar.wait(queue).await; - } + // Check if the queue is non-empty + while queue.is_empty() { + // Release queue mutex and sleep + queue = condvar.wait(queue).await; + } - let _ = queue.items.pop_front().unwrap(); + let _ = queue.items.pop_front().unwrap(); - task_consumed_.fetch_add(1, Ordering::Relaxed); + task_consumed.fetch_add(1, Ordering::Relaxed); - // Do something + // Do something - // Wake up all producer and consumer tasks - condvar.broadcast(); + // Wake up all producer and consumer tasks + condvar.broadcast(); + } } } }); diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs index 76c6a05..b96d979 100644 --- a/core/src/async_util/condwait.rs +++ b/core/src/async_util/condwait.rs @@ -13,10 +13,12 @@ use crate::async_runtime::lock::Mutex; /// /// async { /// let cond_wait = Arc::new(CondWait::new()); -/// let cond_wait_cloned = cond_wait.clone(); -/// let task = spawn(async move { -/// cond_wait_cloned.wait().await; -/// // ... +/// let task = spawn({ +/// let cond_wait = cond_wait.clone(); +/// async move { +/// cond_wait.wait().await; +/// // ... +/// } /// }); /// /// cond_wait.signal().await; @@ -91,12 +93,14 @@ mod tests { let cond_wait = Arc::new(CondWait::new()); let count = Arc::new(AtomicUsize::new(0)); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); // Send a signal to the waiting task @@ -109,20 +113,24 @@ mod tests { assert_eq!(count.load(Ordering::Relaxed), 1); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task1 = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task1 = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); - let cond_wait_cloned = cond_wait.clone(); - let count_cloned = count.clone(); - let task2 = spawn(async move { - cond_wait_cloned.wait().await; - count_cloned.fetch_add(1, Ordering::Relaxed); - // do something + let task2 = spawn({ + let cond_wait = cond_wait.clone(); + let count = count.clone(); + async move { + cond_wait.wait().await; + count.fetch_add(1, Ordering::Relaxed); + // do something + } }); // Broadcast a signal to all waiting tasks -- cgit v1.2.3