diff --git a/core/Cargo.toml b/core/Cargo.toml
new file mode 100644
index 0000000..caa3ed5
--- /dev/null
+++ b/core/Cargo.toml
@@ -0,0 +1,17 @@
+name = "karyons_core"
+version.workspace = true
+edition.workspace = true
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+smol = "1.3.0"
+pin-project-lite = "0.2.13"
+log = "0.4.20"
+bincode = { version="2.0.0-rc.3", features = ["derive"]}
+chrono = "0.4.30"
+rand = "0.8.5"
+thiserror = "1.0.47"
+dirs = "5.0.1"
diff --git a/core/src/async_utils/backoff.rs b/core/src/async_utils/backoff.rs
new file mode 100644
index 0000000..f7e131d
--- /dev/null
+++ b/core/src/async_utils/backoff.rs
@@ -0,0 +1,115 @@
+use std::{
+ cmp::min,
+ sync::atomic::{AtomicBool, AtomicU32, Ordering},
+ time::Duration,
+use smol::Timer;
+/// Exponential backoff
+/// <https://en.wikipedia.org/wiki/Exponential_backoff>
+/// # Examples
+/// ```
+/// use karyons_core::async_utils::Backoff;
+/// async {
+/// let backoff = Backoff::new(300, 3000);
+/// loop {
+/// backoff.sleep().await;
+/// // do something
+/// break;
+/// }
+/// backoff.reset();
+/// // ....
+/// };
+/// ```
+pub struct Backoff {
+ /// The base delay in milliseconds for the initial retry.
+ base_delay: u64,
+ /// The max delay in milliseconds allowed for a retry.
+ max_delay: u64,
+ /// Atomic counter
+ retries: AtomicU32,
+ /// Stop flag
+ stop: AtomicBool,
+impl Backoff {
+ /// Creates a new Backoff.
+ pub fn new(base_delay: u64, max_delay: u64) -> Self {
+ Self {
+ base_delay,
+ max_delay,
+ retries: AtomicU32::new(0),
+ stop: AtomicBool::new(false),
+ }
+ }
+ /// Sleep based on the current retry count and delay values.
+ /// Retruns the delay value.
+ pub async fn sleep(&self) -> u64 {
+ if self.stop.load(Ordering::SeqCst) {
+ Timer::after(Duration::from_millis(self.max_delay)).await;
+ return self.max_delay;
+ }
+ let retries = self.retries.load(Ordering::SeqCst);
+ let delay = self.base_delay * (2_u64).pow(retries);
+ let delay = min(delay, self.max_delay);
+ if delay == self.max_delay {
+ self.stop.store(true, Ordering::SeqCst);
+ }
+ self.retries.store(retries + 1, Ordering::SeqCst);
+ Timer::after(Duration::from_millis(delay)).await;
+ delay
+ }
+ /// Reset the retry counter to 0.
+ pub fn reset(&self) {
+ self.retries.store(0, Ordering::SeqCst);
+ self.stop.store(false, Ordering::SeqCst);
+ }
+mod tests {
+ use super::*;
+ use std::sync::Arc;
+ #[test]
+ fn test_backoff() {
+ smol::block_on(async move {
+ let backoff = Arc::new(Backoff::new(5, 15));
+ let backoff_c = backoff.clone();
+ smol::spawn(async move {
+ let delay = backoff_c.sleep().await;
+ assert_eq!(delay, 5);
+ let delay = backoff_c.sleep().await;
+ assert_eq!(delay, 10);
+ let delay = backoff_c.sleep().await;
+ assert_eq!(delay, 15);
+ })
+ .await;
+ smol::spawn(async move {
+ backoff.reset();
+ let delay = backoff.sleep().await;
+ assert_eq!(delay, 5);
+ })
+ .await;
+ });
+ }
diff --git a/core/src/async_utils/condvar.rs b/core/src/async_utils/condvar.rs
new file mode 100644
index 0000000..814f78f
--- /dev/null
+++ b/core/src/async_utils/condvar.rs
@@ -0,0 +1,387 @@
+use std::{
+ collections::HashMap,
+ future::Future,
+ pin::Pin,
+ sync::Mutex,
+ task::{Context, Poll, Waker},
+use smol::lock::MutexGuard;
+use crate::utils::random_16;
+/// CondVar is an async version of <https://doc.rust-lang.org/std/sync/struct.Condvar.html>
+/// # Example
+/// use std::sync::Arc;
+/// use smol::lock::Mutex;
+/// use karyons_core::async_utils::CondVar;
+/// async {
+/// let val = Arc::new(Mutex::new(false));
+/// let condvar = Arc::new(CondVar::new());
+/// let val_cloned = val.clone();
+/// let condvar_cloned = condvar.clone();
+/// smol::spawn(async move {
+/// let mut val = val_cloned.lock().await;
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar_cloned.wait(val).await;
+/// }
+/// // ...
+/// });
+/// let condvar_cloned = condvar.clone();
+/// smol::spawn(async move {
+/// let mut val = val.lock().await;
+/// // While the boolean flag is false, wait for a signal.
+/// while !*val {
+/// val = condvar_cloned.wait(val).await;
+/// }
+/// // ...
+/// });
+/// // Wake up all waiting tasks on this condvar
+/// condvar.broadcast();
+/// };
+/// ```
+pub struct CondVar {
+ inner: Mutex<Wakers>,
+impl CondVar {
+ /// Creates a new CondVar
+ pub fn new() -> Self {
+ Self {
+ inner: Mutex::new(Wakers::new()),
+ }
+ }
+ /// Blocks the current task until this condition variable receives a notification.
+ pub async fn wait<'a, T>(&self, g: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
+ let m = MutexGuard::source(&g);
+ CondVarAwait::new(self, g).await;
+ m.lock().await
+ }
+ /// Wakes up one blocked task waiting on this condvar.
+ pub fn signal(&self) {
+ self.inner.lock().unwrap().wake(true);
+ }
+ /// Wakes up all blocked tasks waiting on this condvar.
+ pub fn broadcast(&self) {
+ self.inner.lock().unwrap().wake(false);
+ }
+impl Default for CondVar {
+ fn default() -> Self {
+ Self::new()
+ }
+struct CondVarAwait<'a, T> {
+ id: Option<u16>,
+ condvar: &'a CondVar,
+ guard: Option<MutexGuard<'a, T>>,
+impl<'a, T> CondVarAwait<'a, T> {
+ fn new(condvar: &'a CondVar, guard: MutexGuard<'a, T>) -> Self {
+ Self {
+ condvar,
+ guard: Some(guard),
+ id: None,
+ }
+ }
+impl<'a, T> Future for CondVarAwait<'a, T> {
+ type Output = ();
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut inner = self.condvar.inner.lock().unwrap();
+ match self.guard.take() {
+ Some(_) => {
+ // the first pooll will release the Mutexguard
+ self.id = Some(inner.put(Some(cx.waker().clone())));
+ Poll::Pending
+ }
+ None => {
+ // Return Ready if it has already been polled and removed
+ // from the waker list.
+ if self.id.is_none() {
+ return Poll::Ready(());
+ }
+ let i = self.id.as_ref().unwrap();
+ match inner.wakers.get_mut(i).unwrap() {
+ Some(wk) => {
+ // This will prevent cloning again
+ if !wk.will_wake(cx.waker()) {
+ *wk = cx.waker().clone();
+ }
+ Poll::Pending
+ }
+ None => {
+ inner.delete(i);
+ self.id = None;
+ Poll::Ready(())
+ }
+ }
+ }
+ }
+ }
+impl<'a, T> Drop for CondVarAwait<'a, T> {
+ fn drop(&mut self) {
+ if let Some(id) = self.id {
+ let mut inner = self.condvar.inner.lock().unwrap();
+ if let Some(wk) = inner.wakers.get_mut(&id).unwrap().take() {
+ wk.wake()
+ }
+ }
+ }
+/// Wakers is a helper struct to store the task wakers
+struct Wakers {
+ wakers: HashMap<u16, Option<Waker>>,
+impl Wakers {
+ fn new() -> Self {
+ Self {
+ wakers: HashMap::new(),
+ }
+ }
+ fn put(&mut self, waker: Option<Waker>) -> u16 {
+ let mut id: u16;
+ id = random_16();
+ while self.wakers.contains_key(&id) {
+ id = random_16();
+ }
+ self.wakers.insert(id, waker);
+ id
+ }
+ fn delete(&mut self, id: &u16) -> Option<Option<Waker>> {
+ self.wakers.remove(id)
+ }
+ fn wake(&mut self, signal: bool) {
+ for (_, wk) in self.wakers.iter_mut() {
+ match wk.take() {
+ Some(w) => {
+ w.wake();
+ if signal {
+ break;
+ }
+ }
+ None => continue,
+ }
+ }
+ }
+mod tests {
+ use super::*;
+ use smol::lock::Mutex;
+ use std::{
+ collections::VecDeque,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+ };
+ // The tests below demonstrate a solution to a problem in the Wikipedia
+ // explanation of condition variables:
+ // https://en.wikipedia.org/wiki/Monitor_(synchronization)#Solving_the_bounded_producer/consumer_problem.
+ struct Queue {
+ items: VecDeque<String>,
+ max_len: usize,
+ }
+ impl Queue {
+ fn new(max_len: usize) -> Self {
+ Self {
+ items: VecDeque::new(),
+ max_len,
+ }
+ }
+ fn is_full(&self) -> bool {
+ self.items.len() == self.max_len
+ }
+ fn is_empty(&self) -> bool {
+ self.items.is_empty()
+ }
+ }
+ #[test]
+ fn test_condvar_signal() {
+ smol::block_on(async {
+ let number_of_tasks = 30;
+ let queue = Arc::new(Mutex::new(Queue::new(5)));
+ let condvar_full = Arc::new(CondVar::new());
+ let condvar_empty = Arc::new(CondVar::new());
+ let queue_cloned = queue.clone();
+ let condvar_full_cloned = condvar_full.clone();
+ let condvar_empty_cloned = condvar_empty.clone();
+ let _producer1 = smol::spawn(async move {
+ for i in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue_cloned.lock().await;
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar_full_cloned.wait(queue).await;
+ }
+ queue.items.push_back(format!("task {i}"));
+ // Wake up the consumer
+ condvar_empty_cloned.signal();
+ }
+ });
+ let queue_cloned = queue.clone();
+ let task_consumed = Arc::new(AtomicUsize::new(0));
+ let task_consumed_ = task_consumed.clone();
+ let consumer = smol::spawn(async move {
+ for _ in 1..number_of_tasks {
+ // Lock queue mtuex
+ let mut queue = queue_cloned.lock().await;
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar_empty.wait(queue).await;
+ }
+ let _ = queue.items.pop_front().unwrap();
+ task_consumed_.fetch_add(1, Ordering::Relaxed);
+ // Do something
+ // Wake up the producer
+ condvar_full.signal();
+ }
+ });
+ consumer.await;
+ assert!(queue.lock().await.is_empty());
+ assert_eq!(task_consumed.load(Ordering::Relaxed), 29);
+ });
+ }
+ #[test]
+ fn test_condvar_broadcast() {
+ smol::block_on(async {
+ let tasks = 30;
+ let queue = Arc::new(Mutex::new(Queue::new(5)));
+ let condvar = Arc::new(CondVar::new());
+ let queue_cloned = queue.clone();
+ let condvar_cloned = condvar.clone();
+ let _producer1 = smol::spawn(async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue_cloned.lock().await;
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar_cloned.wait(queue).await;
+ }
+ queue.items.push_back(format!("producer1: task {i}"));
+ // Wake up all producer and consumer tasks
+ condvar_cloned.broadcast();
+ }
+ });
+ let queue_cloned = queue.clone();
+ let condvar_cloned = condvar.clone();
+ let _producer2 = smol::spawn(async move {
+ for i in 1..tasks {
+ // Lock queue mtuex
+ let mut queue = queue_cloned.lock().await;
+ // Check if the queue is non-full
+ while queue.is_full() {
+ // Release queue mutex and sleep
+ queue = condvar_cloned.wait(queue).await;
+ }
+ queue.items.push_back(format!("producer2: task {i}"));
+ // Wake up all producer and consumer tasks
+ condvar_cloned.broadcast();
+ }
+ });
+ let queue_cloned = queue.clone();
+ let task_consumed = Arc::new(AtomicUsize::new(0));
+ let task_consumed_ = task_consumed.clone();
+ let consumer = smol::spawn(async move {
+ for _ in 1..((tasks * 2) - 1) {
+ {
+ // Lock queue mutex
+ let mut queue = queue_cloned.lock().await;
+ // Check if the queue is non-empty
+ while queue.is_empty() {
+ // Release queue mutex and sleep
+ queue = condvar.wait(queue).await;
+ }
+ let _ = queue.items.pop_front().unwrap();
+ task_consumed_.fetch_add(1, Ordering::Relaxed);
+ // Do something
+ // Wake up all producer and consumer tasks
+ condvar.broadcast();
+ }
+ }
+ });
+ consumer.await;
+ assert!(queue.lock().await.is_empty());
+ assert_eq!(task_consumed.load(Ordering::Relaxed), 58);
+ });
+ }
diff --git a/core/src/async_utils/condwait.rs b/core/src/async_utils/condwait.rs
new file mode 100644
index 0000000..f16a99e
--- /dev/null
+++ b/core/src/async_utils/condwait.rs
@@ -0,0 +1,96 @@
+use smol::lock::Mutex;
+use super::CondVar;
+/// CondWait is a wrapper struct for CondVar with a Mutex boolean flag.
+/// # Example
+/// use std::sync::Arc;
+/// use karyons_core::async_utils::CondWait;
+/// async {
+/// let cond_wait = Arc::new(CondWait::new());
+/// let cond_wait_cloned = cond_wait.clone();
+/// let task = smol::spawn(async move {
+/// cond_wait_cloned.wait().await;
+/// // ...
+/// });
+/// cond_wait.signal().await;
+/// };
+/// ```
+pub struct CondWait {
+ /// The CondVar
+ condvar: CondVar,
+ /// Boolean flag
+ w: Mutex<bool>,
+impl CondWait {
+ /// Creates a new CondWait.
+ pub fn new() -> Self {
+ Self {
+ condvar: CondVar::new(),
+ w: Mutex::new(false),
+ }
+ }
+ /// Waits for a signal or broadcast.
+ pub async fn wait(&self) {
+ let mut w = self.w.lock().await;
+ // While the boolean flag is false, wait for a signal.
+ while !*w {
+ w = self.condvar.wait(w).await;
+ }
+ }
+ /// Signal a waiting task.
+ pub async fn signal(&self) {
+ *self.w.lock().await = true;
+ self.condvar.signal();
+ }
+ /// Signal all waiting tasks.
+ pub async fn broadcast(&self) {
+ *self.w.lock().await = true;
+ self.condvar.broadcast();
+ }
+ /// Reset the boolean flag value to false.
+ pub async fn reset(&self) {
+ *self.w.lock().await = false;
+ }
+impl Default for CondWait {
+ fn default() -> Self {
+ Self::new()
+ }
+mod tests {
+ use super::*;
+ use std::sync::Arc;
+ #[test]
+ fn test_cond_wait() {
+ smol::block_on(async {
+ let cond_wait = Arc::new(CondWait::new());
+ let cond_wait_cloned = cond_wait.clone();
+ let task = smol::spawn(async move {
+ cond_wait_cloned.wait().await;
+ true
+ });
+ cond_wait.signal().await;
+ assert!(task.await);
+ });
+ }
diff --git a/core/src/async_utils/mod.rs b/core/src/async_utils/mod.rs
new file mode 100644
index 0000000..c871bad
--- /dev/null
+++ b/core/src/async_utils/mod.rs
@@ -0,0 +1,13 @@
+mod backoff;
+mod condvar;
+mod condwait;
+mod select;
+mod task_group;
+mod timeout;
+pub use backoff::Backoff;
+pub use condvar::CondVar;
+pub use condwait::CondWait;
+pub use select::{select, Either};
+pub use task_group::{TaskGroup, TaskResult};
+pub use timeout::timeout;
diff --git a/core/src/async_utils/select.rs b/core/src/async_utils/select.rs
new file mode 100644
index 0000000..d61b355
--- /dev/null
+++ b/core/src/async_utils/select.rs
@@ -0,0 +1,99 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use smol::future::Future;
+/// Returns the result of the future that completes first, preferring future1
+/// if both are ready.
+/// # Examples
+/// ```
+/// use std::future;
+/// use karyons_core::async_utils::{select, Either};
+/// async {
+/// let fut1 = future::pending::<String>();
+/// let fut2 = future::ready(0);
+/// let res = select(fut1, fut2).await;
+/// assert!(matches!(res, Either::Right(0)));
+/// // ....
+/// };
+/// ```
+pub fn select<T1, T2, F1, F2>(future1: F1, future2: F2) -> Select<F1, F2>
+ F1: Future<Output = T1>,
+ F2: Future<Output = T2>,
+ Select { future1, future2 }
+pin_project! {
+ #[derive(Debug)]
+ pub struct Select<F1, F2> {
+ #[pin]
+ future1: F1,
+ #[pin]
+ future2: F2,
+ }
+/// The return value from the `select` function, indicating which future
+/// completed first.
+pub enum Either<T1, T2> {
+ Left(T1),
+ Right(T2),
+// Implement the Future trait for the Select struct.
+impl<T1, T2, F1, F2> Future for Select<F1, F2>
+ F1: Future<Output = T1>,
+ F2: Future<Output = T2>,
+ type Output = Either<T1, T2>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.project();
+ if let Poll::Ready(t) = this.future1.poll(cx) {
+ return Poll::Ready(Either::Left(t));
+ }
+ if let Poll::Ready(t) = this.future2.poll(cx) {
+ return Poll::Ready(Either::Right(t));
+ }
+ Poll::Pending
+ }
+mod tests {
+ use super::{select, Either};
+ use smol::Timer;
+ use std::future;
+ #[test]
+ fn test_async_select() {
+ smol::block_on(async move {
+ let fut = select(Timer::never(), future::ready(0 as u32)).await;
+ assert!(matches!(fut, Either::Right(0)));
+ let fut1 = future::pending::<String>();
+ let fut2 = future::ready(0);
+ let res = select(fut1, fut2).await;
+ assert!(matches!(res, Either::Right(0)));
+ let fut1 = future::ready(0);
+ let fut2 = future::pending::<String>();
+ let res = select(fut1, fut2).await;
+ assert!(matches!(res, Either::Left(_)));
+ });
+ }
diff --git a/core/src/async_utils/task_group.rs b/core/src/async_utils/task_group.rs
new file mode 100644
index 0000000..8707d0e
--- /dev/null
+++ b/core/src/async_utils/task_group.rs
@@ -0,0 +1,197 @@
+use std::{future::Future, sync::Arc, sync::Mutex};
+use smol::Task;
+use crate::Executor;
+use super::{select, CondWait, Either};
+/// TaskGroup is a group of spawned tasks.
+/// # Example
+/// ```
+/// use std::sync::Arc;
+/// use karyons_core::async_utils::TaskGroup;
+/// async {
+/// let ex = Arc::new(smol::Executor::new());
+/// let group = TaskGroup::new();
+/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {});
+/// group.cancel().await;
+/// };
+/// ```
+pub struct TaskGroup {
+ tasks: Mutex<Vec<TaskHandler>>,
+ stop_signal: Arc<CondWait>,
+impl<'a> TaskGroup {
+ /// Creates a new task group
+ pub fn new() -> Self {
+ Self {
+ tasks: Mutex::new(Vec::new()),
+ stop_signal: Arc::new(CondWait::new()),
+ }
+ }
+ /// Spawns a new task and calls the callback after it has completed
+ /// or been canceled. The callback will have the `TaskResult` as a
+ /// parameter, indicating whether the task completed or was canceled.
+ pub fn spawn<T, Fut, CallbackF, CallbackFut>(
+ &self,
+ executor: Executor<'a>,
+ fut: Fut,
+ callback: CallbackF,
+ ) where
+ T: Send + Sync + 'a,
+ Fut: Future<Output = T> + Send + 'a,
+ CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a,
+ CallbackFut: Future<Output = ()> + Send + 'a,
+ {
+ let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone());
+ self.tasks.lock().unwrap().push(task);
+ }
+ /// Checks if the task group is empty.
+ pub fn is_empty(&self) -> bool {
+ self.tasks.lock().unwrap().is_empty()
+ }
+ /// Get the number of the tasks in the group.
+ pub fn len(&self) -> usize {
+ self.tasks.lock().unwrap().len()
+ }
+ /// Cancels all tasks in the group.
+ pub async fn cancel(&self) {
+ self.stop_signal.broadcast().await;
+ loop {
+ let task = self.tasks.lock().unwrap().pop();
+ if let Some(t) = task {
+ t.cancel().await
+ } else {
+ break;
+ }
+ }
+ }
+impl Default for TaskGroup {
+ fn default() -> Self {
+ Self::new()
+ }
+/// The result of a spawned task.
+pub enum TaskResult<T> {
+ Completed(T),
+ Cancelled,
+impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match self {
+ TaskResult::Cancelled => write!(f, "Task cancelled"),
+ TaskResult::Completed(res) => write!(f, "Task completed: {:?}", res),
+ }
+ }
+/// TaskHandler
+pub struct TaskHandler {
+ task: Task<()>,
+ cancel_flag: Arc<CondWait>,
+impl<'a> TaskHandler {
+ /// Creates a new task handle
+ fn new<T, Fut, CallbackF, CallbackFut>(
+ ex: Executor<'a>,
+ fut: Fut,
+ callback: CallbackF,
+ stop_signal: Arc<CondWait>,
+ ) -> TaskHandler
+ where
+ T: Send + Sync + 'a,
+ Fut: Future<Output = T> + Send + 'a,
+ CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a,
+ CallbackFut: Future<Output = ()> + Send + 'a,
+ {
+ let cancel_flag = Arc::new(CondWait::new());
+ let cancel_flag_c = cancel_flag.clone();
+ let task = ex.spawn(async move {
+ //start_signal.signal().await;
+ // Waits for either the stop signal or the task to complete.
+ let result = select(stop_signal.wait(), fut).await;
+ let result = match result {
+ Either::Left(_) => TaskResult::Cancelled,
+ Either::Right(res) => TaskResult::Completed(res),
+ };
+ // Call the callback with the result.
+ callback(result).await;
+ cancel_flag_c.signal().await;
+ });
+ TaskHandler { task, cancel_flag }
+ }
+ /// Cancels the task.
+ async fn cancel(self) {
+ self.cancel_flag.wait().await;
+ self.task.cancel().await;
+ }
+mod tests {
+ use super::*;
+ use std::{future, sync::Arc};
+ #[test]
+ fn test_task_group() {
+ let ex = Arc::new(smol::Executor::new());
+ smol::block_on(ex.clone().run(async move {
+ let group = Arc::new(TaskGroup::new());
+ group.spawn(ex.clone(), future::ready(0), |res| async move {
+ assert!(matches!(res, TaskResult::Completed(0)));
+ });
+ group.spawn(ex.clone(), future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+ let groupc = group.clone();
+ let exc = ex.clone();
+ group.spawn(
+ ex.clone(),
+ async move {
+ groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+ },
+ |res| async move {
+ assert!(matches!(res, TaskResult::Completed(_)));
+ },
+ );
+ // Do something
+ smol::Timer::after(std::time::Duration::from_millis(50)).await;
+ group.cancel().await;
+ }));
+ }
diff --git a/core/src/async_utils/timeout.rs b/core/src/async_utils/timeout.rs
new file mode 100644
index 0000000..7c55e1b
--- /dev/null
+++ b/core/src/async_utils/timeout.rs
@@ -0,0 +1,52 @@
+use std::{future::Future, time::Duration};
+use smol::Timer;
+use super::{select, Either};
+use crate::{error::Error, Result};
+/// Waits for a future to complete or times out if it exceeds a specified
+/// duration.
+/// # Example
+/// ```
+/// use std::{future, time::Duration};
+/// use karyons_core::async_utils::timeout;
+/// async {
+/// let fut = future::pending::<()>();
+/// assert!(timeout(Duration::from_millis(100), fut).await.is_err());
+/// };
+/// ```
+pub async fn timeout<T, F>(delay: Duration, future1: F) -> Result<T>
+ F: Future<Output = T>,
+ let result = select(Timer::after(delay), future1).await;
+ match result {
+ Either::Left(_) => Err(Error::Timeout),
+ Either::Right(res) => Ok(res),
+ }
+mod tests {
+ use super::*;
+ use std::{future, time::Duration};
+ #[test]
+ fn test_timeout() {
+ smol::block_on(async move {
+ let fut = future::pending::<()>();
+ assert!(timeout(Duration::from_millis(10), fut).await.is_err());
+ let fut = smol::Timer::after(Duration::from_millis(10));
+ assert!(timeout(Duration::from_millis(50), fut).await.is_ok())
+ });
+ }
diff --git a/core/src/error.rs b/core/src/error.rs
new file mode 100644
index 0000000..15947c8
--- /dev/null
+++ b/core/src/error.rs
@@ -0,0 +1,51 @@
+use thiserror::Error as ThisError;
+pub type Result<T> = std::result::Result<T, Error>;
+#[derive(ThisError, Debug)]
+pub enum Error {
+ #[error("IO Error: {0}")]
+ IO(#[from] std::io::Error),
+ #[error("Timeout Error")]
+ Timeout,
+ #[error("Path Not Found Error: {0}")]
+ PathNotFound(&'static str),
+ #[error("Channel Send Error: {0}")]
+ ChannelSend(String),
+ #[error("Channel Receive Error: {0}")]
+ ChannelRecv(String),
+ #[error("Decode Error: {0}")]
+ Decode(String),
+ #[error("Encode Error: {0}")]
+ Encode(String),
+impl<T> From<smol::channel::SendError<T>> for Error {
+ fn from(error: smol::channel::SendError<T>) -> Self {
+ Error::ChannelSend(error.to_string())
+ }
+impl From<smol::channel::RecvError> for Error {
+ fn from(error: smol::channel::RecvError) -> Self {
+ Error::ChannelRecv(error.to_string())
+ }
+impl From<bincode::error::DecodeError> for Error {
+ fn from(error: bincode::error::DecodeError) -> Self {
+ Error::Decode(error.to_string())
+ }
+impl From<bincode::error::EncodeError> for Error {
+ fn from(error: bincode::error::EncodeError) -> Self {
+ Error::Encode(error.to_string())
+ }
diff --git a/core/src/event.rs b/core/src/event.rs
new file mode 100644
index 0000000..b856385
--- /dev/null
+++ b/core/src/event.rs
@@ -0,0 +1,451 @@
+use std::{
+ any::Any,
+ collections::HashMap,
+ marker::PhantomData,
+ sync::{Arc, Weak},
+use chrono::{DateTime, Utc};
+use log::{error, trace};
+use smol::{
+ channel::{Receiver, Sender},
+ lock::Mutex,
+use crate::{utils::random_16, Result};
+pub type ArcEventSys<T> = Arc<EventSys<T>>;
+pub type WeakEventSys<T> = Weak<EventSys<T>>;
+pub type EventListenerID = u16;
+type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
+/// EventSys supports event emission to registered listeners based on topics.
+/// # Example
+/// ```
+/// use karyons_core::event::{EventSys, EventValueTopic, EventValue};
+/// async {
+/// let event_sys = EventSys::new();
+/// #[derive(Hash, PartialEq, Eq, Debug, Clone)]
+/// enum Topic {
+/// TopicA,
+/// TopicB,
+/// }
+/// #[derive(Clone, Debug, PartialEq)]
+/// struct A(usize);
+/// impl EventValue for A {
+/// fn id() -> &'static str {
+/// "A"
+/// }
+/// }
+/// let listener = event_sys.register::<A>(&Topic::TopicA).await;
+/// event_sys.emit_by_topic(&Topic::TopicA, &A(3)) .await;
+/// let msg: A = listener.recv().await.unwrap();
+/// #[derive(Clone, Debug, PartialEq)]
+/// struct B(usize);
+/// impl EventValue for B {
+/// fn id() -> &'static str {
+/// "B"
+/// }
+/// }
+/// impl EventValueTopic for B {
+/// type Topic = Topic;
+/// fn topic() -> Self::Topic{
+/// Topic::TopicB
+/// }
+/// }
+/// let listener = event_sys.register::<B>(&Topic::TopicB).await;
+/// event_sys.emit(&B(3)) .await;
+/// let msg: B = listener.recv().await.unwrap();
+/// // ....
+/// };
+/// ```
+pub struct EventSys<T> {
+ listeners: Mutex<Listeners<T>>,
+impl<T> EventSys<T>
+ T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
+ /// Creates a new `EventSys`
+ pub fn new() -> ArcEventSys<T> {
+ Arc::new(Self {
+ listeners: Mutex::new(HashMap::new()),
+ })
+ }
+ /// Emits an event to the listeners.
+ ///
+ /// The event must implement the `EventValueTopic` trait to indicate the
+ /// topic of the event. Otherwise, you can use `emit_by_topic()`.
+ pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) {
+ let topic = E::topic();
+ self.emit_by_topic(&topic, value).await;
+ }
+ /// Emits an event to the listeners.
+ pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) {
+ let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
+ let event = Event::new(value);
+ let mut topics = self.listeners.lock().await;
+ if !topics.contains_key(topic) {
+ error!("Failed to emit an event to a non-existent topic");
+ return;
+ }
+ let event_ids = topics.get_mut(topic).unwrap();
+ let event_id = E::id().to_string();
+ if !event_ids.contains_key(&event_id) {
+ error!("Failed to emit an event to a non-existent event id");
+ return;
+ }
+ let mut failed_listeners = vec![];
+ let listeners = event_ids.get_mut(&event_id).unwrap();
+ for (listener_id, listener) in listeners.iter() {
+ if let Err(err) = listener.send(event.clone()).await {
+ trace!("Failed to emit event for topic {:?}: {}", topic, err);
+ failed_listeners.push(*listener_id);
+ }
+ }
+ for listener_id in failed_listeners.iter() {
+ listeners.remove(listener_id);
+ }
+ }
+ /// Registers a new event listener for the given topic.
+ pub async fn register<E: EventValueAny + EventValue + Clone>(
+ self: &Arc<Self>,
+ topic: &T,
+ ) -> EventListener<T, E> {
+ let chan = smol::channel::unbounded();
+ let topics = &mut self.listeners.lock().await;
+ if !topics.contains_key(topic) {
+ topics.insert(topic.clone(), HashMap::new());
+ }
+ let event_ids = topics.get_mut(topic).unwrap();
+ let event_id = E::id().to_string();
+ if !event_ids.contains_key(&event_id) {
+ event_ids.insert(event_id.clone(), HashMap::new());
+ }
+ let listeners = event_ids.get_mut(&event_id).unwrap();
+ let mut listener_id = random_16();
+ while listeners.contains_key(&listener_id) {
+ listener_id = random_16();
+ }
+ let listener =
+ EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
+ listeners.insert(listener_id, chan.0);
+ listener
+ }
+ /// Removes an event listener attached to the given topic.
+ async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
+ let topics = &mut self.listeners.lock().await;
+ if !topics.contains_key(topic) {
+ error!("Failed to remove a non-existent topic");
+ return;
+ }
+ let event_ids = topics.get_mut(topic).unwrap();
+ if !event_ids.contains_key(event_id) {
+ error!("Failed to remove a non-existent event id");
+ return;
+ }
+ let listeners = event_ids.get_mut(event_id).unwrap();
+ if listeners.remove(listener_id).is_none() {
+ error!("Failed to remove a non-existent event listener");
+ }
+ }
+/// EventListener listens for and receives events from the `EventSys`.
+pub struct EventListener<T, E> {
+ id: EventListenerID,
+ recv_chan: Receiver<Event>,
+ event_sys: WeakEventSys<T>,
+ event_id: String,
+ topic: T,
+ phantom: PhantomData<E>,
+impl<T, E> EventListener<T, E>
+ T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
+ E: EventValueAny + Clone + EventValue,
+ /// Create a new event listener.
+ fn new(
+ id: EventListenerID,
+ event_sys: WeakEventSys<T>,
+ recv_chan: Receiver<Event>,
+ event_id: &str,
+ topic: &T,
+ ) -> EventListener<T, E> {
+ Self {
+ id,
+ recv_chan,
+ event_sys,
+ event_id: event_id.to_string(),
+ topic: topic.clone(),
+ phantom: PhantomData,
+ }
+ }
+ /// Receive the next event.
+ pub async fn recv(&self) -> Result<E> {
+ match self.recv_chan.recv().await {
+ Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() {
+ Some(v) => Ok(v.clone()),
+ None => unreachable!("Error when attempting to downcast the event value."),
+ },
+ Err(err) => {
+ error!("Failed to receive new event: {err}");
+ self.cancel().await;
+ Err(err.into())
+ }
+ }
+ }
+ /// Cancels the listener and removes it from the `EventSys`.
+ pub async fn cancel(&self) {
+ self.event_sys()
+ .remove(&self.topic, &self.event_id, &self.id)
+ .await;
+ }
+ /// Returns the topic for this event listener.
+ pub async fn topic(&self) -> &T {
+ &self.topic
+ }
+ /// Returns the event id for this event listener.
+ pub async fn event_id(&self) -> &String {
+ &self.event_id
+ }
+ fn event_sys(&self) -> ArcEventSys<T> {
+ self.event_sys.upgrade().unwrap()
+ }
+/// An event within the `EventSys`.
+#[derive(Clone, Debug)]
+pub struct Event {
+ /// The time at which the event was created.
+ created_at: DateTime<Utc>,
+ /// The value of the Event.
+ value: Arc<dyn EventValueAny>,
+impl Event {
+ /// Creates a new Event.
+ pub fn new(value: Arc<dyn EventValueAny>) -> Self {
+ Self {
+ created_at: Utc::now(),
+ value,
+ }
+ }
+impl std::fmt::Display for Event {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}: {:?}", self.created_at, self.value)
+ }
+pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug {
+ fn value_as_any(&self) -> &dyn Any;
+impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T {
+ fn value_as_any(&self) -> &dyn Any {
+ self
+ }
+pub trait EventValue: EventValueAny {
+ fn id() -> &'static str
+ where
+ Self: Sized;
+pub trait EventValueTopic: EventValueAny + EventValue {
+ type Topic;
+ fn topic() -> Self::Topic
+ where
+ Self: Sized;
+mod tests {
+ use super::*;
+ #[derive(Hash, PartialEq, Eq, Debug, Clone)]
+ enum Topic {
+ TopicA,
+ TopicB,
+ TopicC,
+ TopicD,
+ TopicE,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct A {
+ a_value: usize,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct B {
+ b_value: usize,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct C {
+ c_value: usize,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct D {
+ d_value: usize,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct E {
+ e_value: usize,
+ }
+ #[derive(Clone, Debug, PartialEq)]
+ struct F {
+ f_value: usize,
+ }
+ impl EventValue for A {
+ fn id() -> &'static str {
+ "A"
+ }
+ }
+ impl EventValue for B {
+ fn id() -> &'static str {
+ "B"
+ }
+ }
+ impl EventValue for C {
+ fn id() -> &'static str {
+ "C"
+ }
+ }
+ impl EventValue for D {
+ fn id() -> &'static str {
+ "D"
+ }
+ }
+ impl EventValue for E {
+ fn id() -> &'static str {
+ "E"
+ }
+ }
+ impl EventValue for F {
+ fn id() -> &'static str {
+ "F"
+ }
+ }
+ impl EventValueTopic for C {
+ type Topic = Topic;
+ fn topic() -> Self::Topic {
+ Topic::TopicC
+ }
+ }
+ #[test]
+ fn test_event_sys() {
+ smol::block_on(async move {
+ let event_sys = EventSys::<Topic>::new();
+ let a_listener = event_sys.register::<A>(&Topic::TopicA).await;
+ let b_listener = event_sys.register::<B>(&Topic::TopicB).await;
+ event_sys
+ .emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
+ .await;
+ event_sys
+ .emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
+ .await;
+ let msg = a_listener.recv().await.unwrap();
+ assert_eq!(msg, A { a_value: 3 });
+ let msg = b_listener.recv().await.unwrap();
+ assert_eq!(msg, B { b_value: 5 });
+ // register the same event type to different topics
+ let c_listener = event_sys.register::<C>(&Topic::TopicC).await;
+ let d_listener = event_sys.register::<C>(&Topic::TopicD).await;
+ event_sys.emit(&C { c_value: 10 }).await;
+ let msg = c_listener.recv().await.unwrap();
+ assert_eq!(msg, C { c_value: 10 });
+ event_sys
+ .emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
+ .await;
+ let msg = d_listener.recv().await.unwrap();
+ assert_eq!(msg, C { c_value: 10 });
+ // register different event types to the same topic
+ let e_listener = event_sys.register::<E>(&Topic::TopicE).await;
+ let f_listener = event_sys.register::<F>(&Topic::TopicE).await;
+ event_sys
+ .emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
+ .await;
+ let msg = e_listener.recv().await.unwrap();
+ assert_eq!(msg, E { e_value: 5 });
+ event_sys
+ .emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
+ .await;
+ let msg = f_listener.recv().await.unwrap();
+ assert_eq!(msg, F { f_value: 5 });
+ });
+ }
diff --git a/core/src/lib.rs b/core/src/lib.rs
new file mode 100644
index 0000000..83af888
--- /dev/null
+++ b/core/src/lib.rs
@@ -0,0 +1,21 @@
+/// A set of helper tools and functions.
+pub mod utils;
+/// A module containing async utilities that work with the `smol` async runtime.
+pub mod async_utils;
+/// Represents Karyons's Core Error.
+pub mod error;
+/// [`EventSys`](./event/struct.EventSys.html) Implementation
+pub mod event;
+/// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html)
+pub mod pubsub;
+use error::Result;
+use smol::Executor as SmolEx;
+use std::sync::Arc;
+/// A wrapper for smol::Executor
+pub type Executor<'a> = Arc<SmolEx<'a>>;
diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs
new file mode 100644
index 0000000..4cc0ab7
--- /dev/null
+++ b/core/src/pubsub.rs
@@ -0,0 +1,115 @@
+use std::{collections::HashMap, sync::Arc};
+use log::error;
+use smol::lock::Mutex;
+use crate::{utils::random_16, Result};
+pub type ArcPublisher<T> = Arc<Publisher<T>>;
+pub type SubscriptionID = u16;
+/// A simple publish-subscribe system.
+// # Example
+/// ```
+/// use karyons_core::pubsub::{Publisher};
+/// async {
+/// let publisher = Publisher::new();
+/// let sub = publisher.subscribe().await;
+/// publisher.notify(&String::from("MESSAGE")).await;
+/// let msg = sub.recv().await;
+/// // ....
+/// };
+/// ```
+pub struct Publisher<T> {
+ subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>,
+impl<T: Clone> Publisher<T> {
+ /// Creates a new Publisher
+ pub fn new() -> ArcPublisher<T> {
+ Arc::new(Self {
+ subs: Mutex::new(HashMap::new()),
+ })
+ }
+ /// Subscribe and return a Subscription
+ pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> {
+ let mut subs = self.subs.lock().await;
+ let chan = smol::channel::unbounded();
+ let mut sub_id = random_16();
+ // While the SubscriptionID already exists, generate a new one
+ while subs.contains_key(&sub_id) {
+ sub_id = random_16();
+ }
+ let sub = Subscription::new(sub_id, self.clone(), chan.1);
+ subs.insert(sub_id, chan.0);
+ sub
+ }
+ /// Unsubscribe from the Publisher
+ pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
+ self.subs.lock().await.remove(id);
+ }
+ /// Notify all subscribers
+ pub async fn notify(self: &Arc<Self>, value: &T) {
+ let mut subs = self.subs.lock().await;
+ let mut closed_subs = vec![];
+ for (sub_id, sub) in subs.iter() {
+ if let Err(err) = sub.send(value.clone()).await {
+ error!("failed to notify {}: {}", sub_id, err);
+ closed_subs.push(*sub_id);
+ }
+ }
+ for sub_id in closed_subs.iter() {
+ subs.remove(sub_id);
+ }
+ }
+// Subscription
+pub struct Subscription<T> {
+ id: SubscriptionID,
+ recv_chan: smol::channel::Receiver<T>,
+ publisher: ArcPublisher<T>,
+impl<T: Clone> Subscription<T> {
+ /// Creates a new Subscription
+ pub fn new(
+ id: SubscriptionID,
+ publisher: ArcPublisher<T>,
+ recv_chan: smol::channel::Receiver<T>,
+ ) -> Subscription<T> {
+ Self {
+ id,
+ recv_chan,
+ publisher,
+ }
+ }
+ /// Receive a message from the Publisher
+ pub async fn recv(&self) -> Result<T> {
+ let msg = self.recv_chan.recv().await?;
+ Ok(msg)
+ }
+ /// Unsubscribe from the Publisher
+ pub async fn unsubscribe(&self) {
+ self.publisher.unsubscribe(&self.id).await;
+ }
diff --git a/core/src/utils/decode.rs b/core/src/utils/decode.rs
new file mode 100644
index 0000000..a8a6522
--- /dev/null
+++ b/core/src/utils/decode.rs
@@ -0,0 +1,10 @@
+use bincode::Decode;
+use crate::Result;
+/// Decodes a given type `T` from the given slice. returns the decoded value
+/// along with the number of bytes read.
+pub fn decode<T: Decode>(src: &[u8]) -> Result<(T, usize)> {
+ let (result, bytes_read) = bincode::decode_from_slice(src, bincode::config::standard())?;
+ Ok((result, bytes_read))
diff --git a/core/src/utils/encode.rs b/core/src/utils/encode.rs
new file mode 100644
index 0000000..7d1061b
--- /dev/null
+++ b/core/src/utils/encode.rs
@@ -0,0 +1,15 @@
+use bincode::Encode;
+use crate::Result;
+/// Encode the given type `T` into a `Vec<u8>`.
+pub fn encode<T: Encode>(msg: &T) -> Result<Vec<u8>> {
+ let vec = bincode::encode_to_vec(msg, bincode::config::standard())?;
+ Ok(vec)
+/// Encode the given type `T` into the given slice..
+pub fn encode_into_slice<T: Encode>(msg: &T, dst: &mut [u8]) -> Result<()> {
+ bincode::encode_into_slice(msg, dst, bincode::config::standard())?;
+ Ok(())
diff --git a/core/src/utils/mod.rs b/core/src/utils/mod.rs
new file mode 100644
index 0000000..a3c3f50
--- /dev/null
+++ b/core/src/utils/mod.rs
@@ -0,0 +1,19 @@
+mod decode;
+mod encode;
+mod path;
+pub use decode::decode;
+pub use encode::{encode, encode_into_slice};
+pub use path::{home_dir, tilde_expand};
+use rand::{rngs::OsRng, Rng};
+/// Generates and returns a random u32 using `rand::rngs::OsRng`.
+pub fn random_32() -> u32 {
+ OsRng.gen()
+/// Generates and returns a random u16 using `rand::rngs::OsRng`.
+pub fn random_16() -> u16 {
+ OsRng.gen()
diff --git a/core/src/utils/path.rs b/core/src/utils/path.rs
new file mode 100644
index 0000000..2cd900a
--- /dev/null
+++ b/core/src/utils/path.rs
@@ -0,0 +1,39 @@
+use std::path::PathBuf;
+use crate::{error::Error, Result};
+/// Returns the user's home directory as a `PathBuf`.
+pub fn home_dir() -> Result<PathBuf> {
+ dirs::home_dir().ok_or(Error::PathNotFound("Home dir not found"))
+/// Expands a tilde (~) in a path and returns the expanded `PathBuf`.
+pub fn tilde_expand(path: &str) -> Result<PathBuf> {
+ match path {
+ "~" => home_dir(),
+ p if p.starts_with("~/") => Ok(home_dir()?.join(&path[2..])),
+ _ => Ok(PathBuf::from(path)),
+ }
+mod tests {
+ use super::*;
+ #[test]
+ fn test_tilde_expand() {
+ let path = "~/src";
+ let expanded_path = dirs::home_dir().unwrap().join("src");
+ assert_eq!(tilde_expand(path).unwrap(), expanded_path);
+ let path = "~";
+ let expanded_path = dirs::home_dir().unwrap();
+ assert_eq!(tilde_expand(path).unwrap(), expanded_path);
+ let path = "";
+ let expanded_path = PathBuf::from("");
+ assert_eq!(tilde_expand(path).unwrap(), expanded_path);
+ }