aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2024-03-20 15:20:21 +0100
committerhozan23 <hozan23@proton.me>2024-03-20 15:20:21 +0100
commit379dca552ca91d22ee007b42f93803ad3dc2b274 (patch)
tree1ba10573b6ec5213baf46f6ab9c3767e0daa4eed /core
parent340957fec147f4429796413f27bbd9b84ba6f141 (diff)
core: add the option to create a new task group without providing an
executor & remove GlobalExecutor type
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml2
-rw-r--r--core/src/async_util/executor.rs30
-rw-r--r--core/src/async_util/mod.rs2
-rw-r--r--core/src/async_util/task_group.rs48
-rw-r--r--core/src/lib.rs9
5 files changed, 79 insertions, 12 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 7e7e511..c8e2b8d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -16,6 +16,8 @@ rand = "0.8.5"
thiserror = "1.0.58"
dirs = "5.0.1"
async-task = "4.7.0"
+async-lock = "3.3.0"
+async-process = "2.1.0"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true}
diff --git a/core/src/async_util/executor.rs b/core/src/async_util/executor.rs
new file mode 100644
index 0000000..3e7aa06
--- /dev/null
+++ b/core/src/async_util/executor.rs
@@ -0,0 +1,30 @@
+use std::{panic::catch_unwind, sync::Arc, thread};
+
+use async_lock::OnceCell;
+use smol::Executor as SmolEx;
+
+static GLOBAL_EXECUTOR: OnceCell<Arc<smol::Executor<'_>>> = OnceCell::new();
+
+/// A pointer to an Executor
+pub type Executor<'a> = Arc<SmolEx<'a>>;
+
+/// Returns a single-threaded global executor
+pub(crate) fn global_executor() -> Executor<'static> {
+ fn init_executor() -> Executor<'static> {
+ let ex = smol::Executor::new();
+ thread::Builder::new()
+ .spawn(|| loop {
+ catch_unwind(|| {
+ smol::block_on(global_executor().run(smol::future::pending::<()>()))
+ })
+ .ok();
+ })
+ .expect("cannot spawn executor thread");
+ // Prevent spawning another thread by running the process driver on this
+ // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs
+ ex.spawn(async_process::driver()).detach();
+ Arc::new(ex)
+ }
+
+ GLOBAL_EXECUTOR.get_or_init_blocking(init_executor).clone()
+}
diff --git a/core/src/async_util/mod.rs b/core/src/async_util/mod.rs
index c871bad..2916118 100644
--- a/core/src/async_util/mod.rs
+++ b/core/src/async_util/mod.rs
@@ -1,6 +1,7 @@
mod backoff;
mod condvar;
mod condwait;
+mod executor;
mod select;
mod task_group;
mod timeout;
@@ -8,6 +9,7 @@ mod timeout;
pub use backoff::Backoff;
pub use condvar::CondVar;
pub use condwait::CondWait;
+pub use executor::Executor;
pub use select::{select, Either};
pub use task_group::{TaskGroup, TaskResult};
pub use timeout::timeout;
diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs
index 7129af2..0fb4855 100644
--- a/core/src/async_util/task_group.rs
+++ b/core/src/async_util/task_group.rs
@@ -2,9 +2,7 @@ use std::{future::Future, sync::Arc, sync::Mutex};
use async_task::FallibleTask;
-use crate::Executor;
-
-use super::{select, CondWait, Either};
+use super::{executor::global_executor, select, CondWait, Either, Executor};
/// TaskGroup is a group of spawned tasks.
///
@@ -35,6 +33,19 @@ pub struct TaskGroup<'a> {
executor: Executor<'a>,
}
+impl TaskGroup<'static> {
+ /// Creates a new task group without providing an executor
+ ///
+ /// This will Spawn a task onto a global executor (single-threaded by default).
+ pub fn new_without_executor() -> Self {
+ Self {
+ tasks: Mutex::new(Vec::new()),
+ stop_signal: Arc::new(CondWait::new()),
+ executor: global_executor(),
+ }
+ }
+}
+
impl<'a> TaskGroup<'a> {
/// Creates a new task group
pub fn new(executor: Executor<'a>) -> Self {
@@ -191,4 +202,35 @@ mod tests {
group.cancel().await;
}));
}
+
+ #[test]
+ fn test_task_group_without_executor() {
+ smol::block_on(async {
+ let group = Arc::new(TaskGroup::new_without_executor());
+
+ group.spawn(future::ready(0), |res| async move {
+ assert!(matches!(res, TaskResult::Completed(0)));
+ });
+
+ group.spawn(future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+
+ let groupc = group.clone();
+ group.spawn(
+ async move {
+ groupc.spawn(future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+ },
+ |res| async move {
+ assert!(matches!(res, TaskResult::Completed(_)));
+ },
+ );
+
+ // Do something
+ smol::Timer::after(std::time::Duration::from_millis(50)).await;
+ group.cancel().await;
+ });
+ }
}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 442b642..ae88188 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -18,13 +18,4 @@ pub mod pubsub;
/// Collects common cryptographic tools
pub mod crypto;
-use smol::Executor as SmolEx;
-use std::sync::Arc;
-
-/// A pointer to an Executor
-pub type Executor<'a> = Arc<SmolEx<'a>>;
-
-/// A Global Executor
-pub type GlobalExecutor = Arc<SmolEx<'static>>;
-
use error::Result;