aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/async_runtime/executor.rs8
-rw-r--r--core/src/async_util/condvar.rs208
-rw-r--r--core/src/async_util/condwait.rs52
3 files changed, 146 insertions, 122 deletions
diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs
index 88f6370..e0b707b 100644
--- a/core/src/async_runtime/executor.rs
+++ b/core/src/async_runtime/executor.rs
@@ -59,11 +59,13 @@ pub fn global_executor() -> Executor {
#[cfg(feature = "tokio")]
fn init_executor() -> Executor {
let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime"));
- let ex_cloned = ex.clone();
thread::Builder::new()
.name("tokio-executor".to_string())
- .spawn(move || {
- catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok();
+ .spawn({
+ let ex = ex.clone();
+ move || {
+ catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok();
+ }
})
.expect("cannot spawn tokio runtime thread");
Executor { inner: ex }
diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs
index 8385982..e425eda 100644
--- a/core/src/async_util/condvar.rs
+++ b/core/src/async_util/condvar.rs
@@ -24,29 +24,33 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16};
/// let val = Arc::new(Mutex::new(false));
/// let condvar = Arc::new(CondVar::new());
///
-/// let val_cloned = val.clone();
-/// let condvar_cloned = condvar.clone();
-/// spawn(async move {
-/// let mut val = val_cloned.lock().await;
+/// spawn({
+/// let val = val.clone();
+/// let condvar = condvar.clone();
+/// async move {
+/// let mut val = val.lock().await;
///
-/// // While the boolean flag is false, wait for a signal.
-/// while !*val {
-/// val = condvar_cloned.wait(val).await;
-/// }
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar.wait(val).await;
+/// }
///
-/// // ...
+/// // ...
+/// }
/// });
///
-/// let condvar_cloned = condvar.clone();
-/// spawn(async move {
-/// let mut val = val.lock().await;
+/// spawn({
+/// let condvar = condvar.clone();
+/// async move {
+/// let mut val = val.lock().await;
///
-/// // While the boolean flag is false, wait for a signal.
-/// while !*val {
-/// val = condvar_cloned.wait(val).await;
-/// }
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar.wait(val).await;
+/// }
///
-/// // ...
+/// // ...
+/// }
/// });
///
/// // Wake up all waiting tasks on this condvar
@@ -253,50 +257,54 @@ mod tests {
let condvar_full = Arc::new(CondVar::new());
let condvar_empty = Arc::new(CondVar::new());
- let queue_cloned = queue.clone();
- let condvar_full_cloned = condvar_full.clone();
- let condvar_empty_cloned = condvar_empty.clone();
+ let _producer1 = spawn({
+ let queue = queue.clone();
+ let condvar_full = condvar_full.clone();
+ let condvar_empty = condvar_empty.clone();
+ async move {
+ for i in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar_full.wait(queue).await;
+ }
- let _producer1 = spawn(async move {
- for i in 1..number_of_tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
+ queue.items.push_back(format!("task {i}"));
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_full_cloned.wait(queue).await;
+ // Wake up the consumer
+ condvar_empty.signal();
}
-
- queue.items.push_back(format!("task {i}"));
-
- // Wake up the consumer
- condvar_empty_cloned.signal();
}
});
- let queue_cloned = queue.clone();
let task_consumed = Arc::new(AtomicUsize::new(0));
- let task_consumed_ = task_consumed.clone();
- let consumer = spawn(async move {
- for _ in 1..number_of_tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-empty
- while queue.is_empty() {
- // Release queue mutex and sleep
- queue = condvar_empty.wait(queue).await;
- }
- let _ = queue.items.pop_front().unwrap();
+ let consumer = spawn({
+ let queue = queue.clone();
+ let task_consumed = task_consumed.clone();
+ async move {
+ for _ in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar_empty.wait(queue).await;
+ }
+
+ let _ = queue.items.pop_front().unwrap();
- task_consumed_.fetch_add(1, Ordering::Relaxed);
+ task_consumed.fetch_add(1, Ordering::Relaxed);
- // Do something
+ // Do something
- // Wake up the producer
- condvar_full.signal();
+ // Wake up the producer
+ condvar_full.signal();
+ }
}
});
@@ -314,70 +322,76 @@ mod tests {
let queue = Arc::new(Mutex::new(Queue::new(5)));
let condvar = Arc::new(CondVar::new());
- let queue_cloned = queue.clone();
- let condvar_cloned = condvar.clone();
- let _producer1 = spawn(async move {
- for i in 1..tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_cloned.wait(queue).await;
- }
+ let _producer1 = spawn({
+ let queue = queue.clone();
+ let condvar = condvar.clone();
+ async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
+
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
- queue.items.push_back(format!("producer1: task {i}"));
+ queue.items.push_back(format!("producer1: task {i}"));
- // Wake up all producer and consumer tasks
- condvar_cloned.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
});
- let queue_cloned = queue.clone();
- let condvar_cloned = condvar.clone();
- let _producer2 = spawn(async move {
- for i in 1..tasks {
- // Lock queue mtuex
- let mut queue = queue_cloned.lock().await;
-
- // Check if the queue is non-full
- while queue.is_full() {
- // Release queue mutex and sleep
- queue = condvar_cloned.wait(queue).await;
- }
+ let _producer2 = spawn({
+ let queue = queue.clone();
+ let condvar = condvar.clone();
+ async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue.lock().await;
- queue.items.push_back(format!("producer2: task {i}"));
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
+
+ queue.items.push_back(format!("producer2: task {i}"));
- // Wake up all producer and consumer tasks
- condvar_cloned.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
});
- let queue_cloned = queue.clone();
let task_consumed = Arc::new(AtomicUsize::new(0));
- let task_consumed_ = task_consumed.clone();
- let consumer = spawn(async move {
- for _ in 1..((tasks * 2) - 1) {
- {
- // Lock queue mutex
- let mut queue = queue_cloned.lock().await;
+ let consumer = spawn({
+ let queue = queue.clone();
+ let task_consumed = task_consumed.clone();
+ async move {
+ for _ in 1..((tasks * 2) - 1) {
+ {
+ // Lock queue mutex
+ let mut queue = queue.lock().await;
- // Check if the queue is non-empty
- while queue.is_empty() {
- // Release queue mutex and sleep
- queue = condvar.wait(queue).await;
- }
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
- let _ = queue.items.pop_front().unwrap();
+ let _ = queue.items.pop_front().unwrap();
- task_consumed_.fetch_add(1, Ordering::Relaxed);
+ task_consumed.fetch_add(1, Ordering::Relaxed);
- // Do something
+ // Do something
- // Wake up all producer and consumer tasks
- condvar.broadcast();
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
}
}
});
diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs
index 76c6a05..b96d979 100644
--- a/core/src/async_util/condwait.rs
+++ b/core/src/async_util/condwait.rs
@@ -13,10 +13,12 @@ use crate::async_runtime::lock::Mutex;
///
/// async {
/// let cond_wait = Arc::new(CondWait::new());
-/// let cond_wait_cloned = cond_wait.clone();
-/// let task = spawn(async move {
-/// cond_wait_cloned.wait().await;
-/// // ...
+/// let task = spawn({
+/// let cond_wait = cond_wait.clone();
+/// async move {
+/// cond_wait.wait().await;
+/// // ...
+/// }
/// });
///
/// cond_wait.signal().await;
@@ -91,12 +93,14 @@ mod tests {
let cond_wait = Arc::new(CondWait::new());
let count = Arc::new(AtomicUsize::new(0));
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
// Send a signal to the waiting task
@@ -109,20 +113,24 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1);
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task1 = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task1 = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
- let cond_wait_cloned = cond_wait.clone();
- let count_cloned = count.clone();
- let task2 = spawn(async move {
- cond_wait_cloned.wait().await;
- count_cloned.fetch_add(1, Ordering::Relaxed);
- // do something
+ let task2 = spawn({
+ let cond_wait = cond_wait.clone();
+ let count = count.clone();
+ async move {
+ cond_wait.wait().await;
+ count.fetch_add(1, Ordering::Relaxed);
+ // do something
+ }
});
// Broadcast a signal to all waiting tasks