diff options
author | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
commit | 78884caca030104557ca277dd3a41cefb70f5be8 (patch) | |
tree | c33650dfe44a219e395dff1966d298b58b09acb3 /core | |
parent | f0729022589ee8e48b5558ab30462f95d06fe6df (diff) |
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling
its spawn method
also, define a global executor `Executor<'static>` and use static
lifetime instead of a lifetime placeholder
This improvement simplify the code for spawning a new task. There is no
need to pass the executor around.
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 1 | ||||
-rw-r--r-- | core/src/async_utils/task_group.rs | 73 | ||||
-rw-r--r-- | core/src/executor.rs | 28 | ||||
-rw-r--r-- | core/src/lib.rs | 10 |
4 files changed, 71 insertions, 41 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index caa3ed5..ab05288 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,3 +15,4 @@ chrono = "0.4.30" rand = "0.8.5" thiserror = "1.0.47" dirs = "5.0.1" +async-task = "4.5.0" diff --git a/core/src/async_utils/task_group.rs b/core/src/async_utils/task_group.rs index 8707d0e..afc9648 100644 --- a/core/src/async_utils/task_group.rs +++ b/core/src/async_utils/task_group.rs @@ -1,6 +1,6 @@ use std::{future::Future, sync::Arc, sync::Mutex}; -use smol::Task; +use async_task::FallibleTask; use crate::Executor; @@ -19,9 +19,9 @@ use super::{select, CondWait, Either}; /// async { /// /// let ex = Arc::new(smol::Executor::new()); -/// let group = TaskGroup::new(); +/// let group = TaskGroup::new(ex); /// -/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {}); +/// group.spawn(smol::Timer::never(), |_| async {}); /// /// group.cancel().await; /// @@ -29,35 +29,38 @@ use super::{select, CondWait, Either}; /// /// ``` /// -pub struct TaskGroup { +pub struct TaskGroup<'a> { tasks: Mutex<Vec<TaskHandler>>, stop_signal: Arc<CondWait>, + executor: Executor<'a>, } -impl<'a> TaskGroup { +impl<'a> TaskGroup<'a> { /// Creates a new task group - pub fn new() -> Self { + pub fn new(executor: Executor<'a>) -> Self { Self { tasks: Mutex::new(Vec::new()), stop_signal: Arc::new(CondWait::new()), + executor, } } /// Spawns a new task and calls the callback after it has completed /// or been canceled. The callback will have the `TaskResult` as a /// parameter, indicating whether the task completed or was canceled. - pub fn spawn<T, Fut, CallbackF, CallbackFut>( - &self, - executor: Executor<'a>, - fut: Fut, - callback: CallbackF, - ) where + pub fn spawn<T, Fut, CallbackF, CallbackFut>(&self, fut: Fut, callback: CallbackF) + where T: Send + Sync + 'a, Fut: Future<Output = T> + Send + 'a, CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, CallbackFut: Future<Output = ()> + Send + 'a, { - let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone()); + let task = TaskHandler::new( + self.executor.clone(), + fut, + callback, + self.stop_signal.clone(), + ); self.tasks.lock().unwrap().push(task); } @@ -86,12 +89,6 @@ impl<'a> TaskGroup { } } -impl Default for TaskGroup { - fn default() -> Self { - Self::new() - } -} - /// The result of a spawned task. #[derive(Debug)] pub enum TaskResult<T> { @@ -110,7 +107,7 @@ impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> { /// TaskHandler pub struct TaskHandler { - task: Task<()>, + task: FallibleTask<()>, cancel_flag: Arc<CondWait>, } @@ -130,21 +127,23 @@ impl<'a> TaskHandler { { let cancel_flag = Arc::new(CondWait::new()); let cancel_flag_c = cancel_flag.clone(); - let task = ex.spawn(async move { - //start_signal.signal().await; - // Waits for either the stop signal or the task to complete. - let result = select(stop_signal.wait(), fut).await; + let task = ex + .spawn(async move { + //start_signal.signal().await; + // Waits for either the stop signal or the task to complete. + let result = select(stop_signal.wait(), fut).await; - let result = match result { - Either::Left(_) => TaskResult::Cancelled, - Either::Right(res) => TaskResult::Completed(res), - }; + let result = match result { + Either::Left(_) => TaskResult::Cancelled, + Either::Right(res) => TaskResult::Completed(res), + }; - // Call the callback with the result. - callback(result).await; + // Call the callback with the result. + callback(result).await; - cancel_flag_c.signal().await; - }); + cancel_flag_c.signal().await; + }) + .fallible(); TaskHandler { task, cancel_flag } } @@ -165,22 +164,20 @@ mod tests { fn test_task_group() { let ex = Arc::new(smol::Executor::new()); smol::block_on(ex.clone().run(async move { - let group = Arc::new(TaskGroup::new()); + let group = Arc::new(TaskGroup::new(ex)); - group.spawn(ex.clone(), future::ready(0), |res| async move { + group.spawn(future::ready(0), |res| async move { assert!(matches!(res, TaskResult::Completed(0))); }); - group.spawn(ex.clone(), future::pending::<()>(), |res| async move { + group.spawn(future::pending::<()>(), |res| async move { assert!(matches!(res, TaskResult::Cancelled)); }); let groupc = group.clone(); - let exc = ex.clone(); group.spawn( - ex.clone(), async move { - groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move { + groupc.spawn(future::pending::<()>(), |res| async move { assert!(matches!(res, TaskResult::Cancelled)); }); }, diff --git a/core/src/executor.rs b/core/src/executor.rs new file mode 100644 index 0000000..136f6ea --- /dev/null +++ b/core/src/executor.rs @@ -0,0 +1,28 @@ + + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +fn available_parallelism() -> usize { + thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) { + let (signal, shutdown) = channel::unbounded::<()>(); + + let num_threads = available_parallelism(); + + Parallel::new() + .each(0..(num_threads), |_| { + future::block_on(ex.run(shutdown.recv())) + }) + // Run the main future on the current thread. + .finish(|| { + future::block_on(async { + main_future.await; + drop(signal); + }) + }); +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 83af888..fef7459 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -4,7 +4,7 @@ pub mod utils; /// A module containing async utilities that work with the `smol` async runtime. pub mod async_utils; -/// Represents Karyons's Core Error. +/// Represents karyons's Core Error. pub mod error; /// [`EventSys`](./event/struct.EventSys.html) Implementation @@ -13,9 +13,13 @@ pub mod event; /// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html) pub mod pubsub; -use error::Result; use smol::Executor as SmolEx; use std::sync::Arc; -/// A wrapper for smol::Executor +/// A pointer to an Executor pub type Executor<'a> = Arc<SmolEx<'a>>; + +/// A Global Executor +pub type GlobalExecutor = Arc<SmolEx<'static>>; + +use error::Result; |