aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3 /core
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/src/async_utils/task_group.rs73
-rw-r--r--core/src/executor.rs28
-rw-r--r--core/src/lib.rs10
4 files changed, 71 insertions, 41 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index caa3ed5..ab05288 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -15,3 +15,4 @@ chrono = "0.4.30"
rand = "0.8.5"
thiserror = "1.0.47"
dirs = "5.0.1"
+async-task = "4.5.0"
diff --git a/core/src/async_utils/task_group.rs b/core/src/async_utils/task_group.rs
index 8707d0e..afc9648 100644
--- a/core/src/async_utils/task_group.rs
+++ b/core/src/async_utils/task_group.rs
@@ -1,6 +1,6 @@
use std::{future::Future, sync::Arc, sync::Mutex};
-use smol::Task;
+use async_task::FallibleTask;
use crate::Executor;
@@ -19,9 +19,9 @@ use super::{select, CondWait, Either};
/// async {
///
/// let ex = Arc::new(smol::Executor::new());
-/// let group = TaskGroup::new();
+/// let group = TaskGroup::new(ex);
///
-/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {});
+/// group.spawn(smol::Timer::never(), |_| async {});
///
/// group.cancel().await;
///
@@ -29,35 +29,38 @@ use super::{select, CondWait, Either};
///
/// ```
///
-pub struct TaskGroup {
+pub struct TaskGroup<'a> {
tasks: Mutex<Vec<TaskHandler>>,
stop_signal: Arc<CondWait>,
+ executor: Executor<'a>,
}
-impl<'a> TaskGroup {
+impl<'a> TaskGroup<'a> {
/// Creates a new task group
- pub fn new() -> Self {
+ pub fn new(executor: Executor<'a>) -> Self {
Self {
tasks: Mutex::new(Vec::new()),
stop_signal: Arc::new(CondWait::new()),
+ executor,
}
}
/// Spawns a new task and calls the callback after it has completed
/// or been canceled. The callback will have the `TaskResult` as a
/// parameter, indicating whether the task completed or was canceled.
- pub fn spawn<T, Fut, CallbackF, CallbackFut>(
- &self,
- executor: Executor<'a>,
- fut: Fut,
- callback: CallbackF,
- ) where
+ pub fn spawn<T, Fut, CallbackF, CallbackFut>(&self, fut: Fut, callback: CallbackF)
+ where
T: Send + Sync + 'a,
Fut: Future<Output = T> + Send + 'a,
CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a,
CallbackFut: Future<Output = ()> + Send + 'a,
{
- let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone());
+ let task = TaskHandler::new(
+ self.executor.clone(),
+ fut,
+ callback,
+ self.stop_signal.clone(),
+ );
self.tasks.lock().unwrap().push(task);
}
@@ -86,12 +89,6 @@ impl<'a> TaskGroup {
}
}
-impl Default for TaskGroup {
- fn default() -> Self {
- Self::new()
- }
-}
-
/// The result of a spawned task.
#[derive(Debug)]
pub enum TaskResult<T> {
@@ -110,7 +107,7 @@ impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> {
/// TaskHandler
pub struct TaskHandler {
- task: Task<()>,
+ task: FallibleTask<()>,
cancel_flag: Arc<CondWait>,
}
@@ -130,21 +127,23 @@ impl<'a> TaskHandler {
{
let cancel_flag = Arc::new(CondWait::new());
let cancel_flag_c = cancel_flag.clone();
- let task = ex.spawn(async move {
- //start_signal.signal().await;
- // Waits for either the stop signal or the task to complete.
- let result = select(stop_signal.wait(), fut).await;
+ let task = ex
+ .spawn(async move {
+ //start_signal.signal().await;
+ // Waits for either the stop signal or the task to complete.
+ let result = select(stop_signal.wait(), fut).await;
- let result = match result {
- Either::Left(_) => TaskResult::Cancelled,
- Either::Right(res) => TaskResult::Completed(res),
- };
+ let result = match result {
+ Either::Left(_) => TaskResult::Cancelled,
+ Either::Right(res) => TaskResult::Completed(res),
+ };
- // Call the callback with the result.
- callback(result).await;
+ // Call the callback with the result.
+ callback(result).await;
- cancel_flag_c.signal().await;
- });
+ cancel_flag_c.signal().await;
+ })
+ .fallible();
TaskHandler { task, cancel_flag }
}
@@ -165,22 +164,20 @@ mod tests {
fn test_task_group() {
let ex = Arc::new(smol::Executor::new());
smol::block_on(ex.clone().run(async move {
- let group = Arc::new(TaskGroup::new());
+ let group = Arc::new(TaskGroup::new(ex));
- group.spawn(ex.clone(), future::ready(0), |res| async move {
+ group.spawn(future::ready(0), |res| async move {
assert!(matches!(res, TaskResult::Completed(0)));
});
- group.spawn(ex.clone(), future::pending::<()>(), |res| async move {
+ group.spawn(future::pending::<()>(), |res| async move {
assert!(matches!(res, TaskResult::Cancelled));
});
let groupc = group.clone();
- let exc = ex.clone();
group.spawn(
- ex.clone(),
async move {
- groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move {
+ groupc.spawn(future::pending::<()>(), |res| async move {
assert!(matches!(res, TaskResult::Cancelled));
});
},
diff --git a/core/src/executor.rs b/core/src/executor.rs
new file mode 100644
index 0000000..136f6ea
--- /dev/null
+++ b/core/src/executor.rs
@@ -0,0 +1,28 @@
+
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+fn available_parallelism() -> usize {
+ thread::available_parallelism()
+ .map(NonZeroUsize::get)
+ .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) {
+ let (signal, shutdown) = channel::unbounded::<()>();
+
+ let num_threads = available_parallelism();
+
+ Parallel::new()
+ .each(0..(num_threads), |_| {
+ future::block_on(ex.run(shutdown.recv()))
+ })
+ // Run the main future on the current thread.
+ .finish(|| {
+ future::block_on(async {
+ main_future.await;
+ drop(signal);
+ })
+ });
+}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 83af888..fef7459 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -4,7 +4,7 @@ pub mod utils;
/// A module containing async utilities that work with the `smol` async runtime.
pub mod async_utils;
-/// Represents Karyons's Core Error.
+/// Represents karyons's Core Error.
pub mod error;
/// [`EventSys`](./event/struct.EventSys.html) Implementation
@@ -13,9 +13,13 @@ pub mod event;
/// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html)
pub mod pubsub;
-use error::Result;
use smol::Executor as SmolEx;
use std::sync::Arc;
-/// A wrapper for smol::Executor
+/// A pointer to an Executor
pub type Executor<'a> = Arc<SmolEx<'a>>;
+
+/// A Global Executor
+pub type GlobalExecutor = Arc<SmolEx<'static>>;
+
+use error::Result;