diff options
Diffstat (limited to 'karyons_core/src')
-rw-r--r-- | karyons_core/src/async_utils/backoff.rs | 115 | ||||
-rw-r--r-- | karyons_core/src/async_utils/condvar.rs | 387 | ||||
-rw-r--r-- | karyons_core/src/async_utils/condwait.rs | 96 | ||||
-rw-r--r-- | karyons_core/src/async_utils/mod.rs | 13 | ||||
-rw-r--r-- | karyons_core/src/async_utils/select.rs | 99 | ||||
-rw-r--r-- | karyons_core/src/async_utils/task_group.rs | 197 | ||||
-rw-r--r-- | karyons_core/src/async_utils/timeout.rs | 52 | ||||
-rw-r--r-- | karyons_core/src/error.rs | 51 | ||||
-rw-r--r-- | karyons_core/src/event.rs | 451 | ||||
-rw-r--r-- | karyons_core/src/lib.rs | 21 | ||||
-rw-r--r-- | karyons_core/src/pubsub.rs | 115 | ||||
-rw-r--r-- | karyons_core/src/utils/decode.rs | 10 | ||||
-rw-r--r-- | karyons_core/src/utils/encode.rs | 15 | ||||
-rw-r--r-- | karyons_core/src/utils/mod.rs | 19 | ||||
-rw-r--r-- | karyons_core/src/utils/path.rs | 39 |
15 files changed, 0 insertions, 1680 deletions
diff --git a/karyons_core/src/async_utils/backoff.rs b/karyons_core/src/async_utils/backoff.rs deleted file mode 100644 index f7e131d..0000000 --- a/karyons_core/src/async_utils/backoff.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{ - cmp::min, - sync::atomic::{AtomicBool, AtomicU32, Ordering}, - time::Duration, -}; - -use smol::Timer; - -/// Exponential backoff -/// <https://en.wikipedia.org/wiki/Exponential_backoff> -/// -/// # Examples -/// -/// ``` -/// use karyons_core::async_utils::Backoff; -/// -/// async { -/// let backoff = Backoff::new(300, 3000); -/// -/// loop { -/// backoff.sleep().await; -/// -/// // do something -/// break; -/// } -/// -/// backoff.reset(); -/// -/// // .... -/// }; -/// -/// ``` -/// -pub struct Backoff { - /// The base delay in milliseconds for the initial retry. - base_delay: u64, - /// The max delay in milliseconds allowed for a retry. - max_delay: u64, - /// Atomic counter - retries: AtomicU32, - /// Stop flag - stop: AtomicBool, -} - -impl Backoff { - /// Creates a new Backoff. - pub fn new(base_delay: u64, max_delay: u64) -> Self { - Self { - base_delay, - max_delay, - retries: AtomicU32::new(0), - stop: AtomicBool::new(false), - } - } - - /// Sleep based on the current retry count and delay values. - /// Retruns the delay value. - pub async fn sleep(&self) -> u64 { - if self.stop.load(Ordering::SeqCst) { - Timer::after(Duration::from_millis(self.max_delay)).await; - return self.max_delay; - } - - let retries = self.retries.load(Ordering::SeqCst); - let delay = self.base_delay * (2_u64).pow(retries); - let delay = min(delay, self.max_delay); - - if delay == self.max_delay { - self.stop.store(true, Ordering::SeqCst); - } - - self.retries.store(retries + 1, Ordering::SeqCst); - - Timer::after(Duration::from_millis(delay)).await; - delay - } - - /// Reset the retry counter to 0. - pub fn reset(&self) { - self.retries.store(0, Ordering::SeqCst); - self.stop.store(false, Ordering::SeqCst); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Arc; - - #[test] - fn test_backoff() { - smol::block_on(async move { - let backoff = Arc::new(Backoff::new(5, 15)); - let backoff_c = backoff.clone(); - smol::spawn(async move { - let delay = backoff_c.sleep().await; - assert_eq!(delay, 5); - - let delay = backoff_c.sleep().await; - assert_eq!(delay, 10); - - let delay = backoff_c.sleep().await; - assert_eq!(delay, 15); - }) - .await; - - smol::spawn(async move { - backoff.reset(); - let delay = backoff.sleep().await; - assert_eq!(delay, 5); - }) - .await; - }); - } -} diff --git a/karyons_core/src/async_utils/condvar.rs b/karyons_core/src/async_utils/condvar.rs deleted file mode 100644 index 814f78f..0000000 --- a/karyons_core/src/async_utils/condvar.rs +++ /dev/null @@ -1,387 +0,0 @@ -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - sync::Mutex, - task::{Context, Poll, Waker}, -}; - -use smol::lock::MutexGuard; - -use crate::utils::random_16; - -/// CondVar is an async version of <https://doc.rust-lang.org/std/sync/struct.Condvar.html> -/// -/// # Example -/// -///``` -/// use std::sync::Arc; -/// -/// use smol::lock::Mutex; -/// -/// use karyons_core::async_utils::CondVar; -/// -/// async { -/// -/// let val = Arc::new(Mutex::new(false)); -/// let condvar = Arc::new(CondVar::new()); -/// -/// let val_cloned = val.clone(); -/// let condvar_cloned = condvar.clone(); -/// smol::spawn(async move { -/// let mut val = val_cloned.lock().await; -/// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } -/// -/// // ... -/// }); -/// -/// let condvar_cloned = condvar.clone(); -/// smol::spawn(async move { -/// let mut val = val.lock().await; -/// -/// // While the boolean flag is false, wait for a signal. -/// while !*val { -/// val = condvar_cloned.wait(val).await; -/// } -/// -/// // ... -/// }); -/// -/// // Wake up all waiting tasks on this condvar -/// condvar.broadcast(); -/// }; -/// -/// ``` - -pub struct CondVar { - inner: Mutex<Wakers>, -} - -impl CondVar { - /// Creates a new CondVar - pub fn new() -> Self { - Self { - inner: Mutex::new(Wakers::new()), - } - } - - /// Blocks the current task until this condition variable receives a notification. - pub async fn wait<'a, T>(&self, g: MutexGuard<'a, T>) -> MutexGuard<'a, T> { - let m = MutexGuard::source(&g); - - CondVarAwait::new(self, g).await; - - m.lock().await - } - - /// Wakes up one blocked task waiting on this condvar. - pub fn signal(&self) { - self.inner.lock().unwrap().wake(true); - } - - /// Wakes up all blocked tasks waiting on this condvar. - pub fn broadcast(&self) { - self.inner.lock().unwrap().wake(false); - } -} - -impl Default for CondVar { - fn default() -> Self { - Self::new() - } -} - -struct CondVarAwait<'a, T> { - id: Option<u16>, - condvar: &'a CondVar, - guard: Option<MutexGuard<'a, T>>, -} - -impl<'a, T> CondVarAwait<'a, T> { - fn new(condvar: &'a CondVar, guard: MutexGuard<'a, T>) -> Self { - Self { - condvar, - guard: Some(guard), - id: None, - } - } -} - -impl<'a, T> Future for CondVarAwait<'a, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut inner = self.condvar.inner.lock().unwrap(); - - match self.guard.take() { - Some(_) => { - // the first pooll will release the Mutexguard - self.id = Some(inner.put(Some(cx.waker().clone()))); - Poll::Pending - } - None => { - // Return Ready if it has already been polled and removed - // from the waker list. - if self.id.is_none() { - return Poll::Ready(()); - } - - let i = self.id.as_ref().unwrap(); - match inner.wakers.get_mut(i).unwrap() { - Some(wk) => { - // This will prevent cloning again - if !wk.will_wake(cx.waker()) { - *wk = cx.waker().clone(); - } - Poll::Pending - } - None => { - inner.delete(i); - self.id = None; - Poll::Ready(()) - } - } - } - } - } -} - -impl<'a, T> Drop for CondVarAwait<'a, T> { - fn drop(&mut self) { - if let Some(id) = self.id { - let mut inner = self.condvar.inner.lock().unwrap(); - if let Some(wk) = inner.wakers.get_mut(&id).unwrap().take() { - wk.wake() - } - } - } -} - -/// Wakers is a helper struct to store the task wakers -struct Wakers { - wakers: HashMap<u16, Option<Waker>>, -} - -impl Wakers { - fn new() -> Self { - Self { - wakers: HashMap::new(), - } - } - - fn put(&mut self, waker: Option<Waker>) -> u16 { - let mut id: u16; - - id = random_16(); - while self.wakers.contains_key(&id) { - id = random_16(); - } - - self.wakers.insert(id, waker); - id - } - - fn delete(&mut self, id: &u16) -> Option<Option<Waker>> { - self.wakers.remove(id) - } - - fn wake(&mut self, signal: bool) { - for (_, wk) in self.wakers.iter_mut() { - match wk.take() { - Some(w) => { - w.wake(); - if signal { - break; - } - } - None => continue, - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use smol::lock::Mutex; - use std::{ - collections::VecDeque, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - }; - - // The tests below demonstrate a solution to a problem in the Wikipedia - // explanation of condition variables: - // https://en.wikipedia.org/wiki/Monitor_(synchronization)#Solving_the_bounded_producer/consumer_problem. - - struct Queue { - items: VecDeque<String>, - max_len: usize, - } - impl Queue { - fn new(max_len: usize) -> Self { - Self { - items: VecDeque::new(), - max_len, - } - } - - fn is_full(&self) -> bool { - self.items.len() == self.max_len - } - - fn is_empty(&self) -> bool { - self.items.is_empty() - } - } - - #[test] - fn test_condvar_signal() { - smol::block_on(async { - let number_of_tasks = 30; - - let queue = Arc::new(Mutex::new(Queue::new(5))); - let condvar_full = Arc::new(CondVar::new()); - let condvar_empty = Arc::new(CondVar::new()); - - let queue_cloned = queue.clone(); - let condvar_full_cloned = condvar_full.clone(); - let condvar_empty_cloned = condvar_empty.clone(); - - let _producer1 = smol::spawn(async move { - for i in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_full_cloned.wait(queue).await; - } - - queue.items.push_back(format!("task {i}")); - - // Wake up the consumer - condvar_empty_cloned.signal(); - } - }); - - let queue_cloned = queue.clone(); - let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - let consumer = smol::spawn(async move { - for _ in 1..number_of_tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar_empty.wait(queue).await; - } - - let _ = queue.items.pop_front().unwrap(); - - task_consumed_.fetch_add(1, Ordering::Relaxed); - - // Do something - - // Wake up the producer - condvar_full.signal(); - } - }); - - consumer.await; - assert!(queue.lock().await.is_empty()); - assert_eq!(task_consumed.load(Ordering::Relaxed), 29); - }); - } - - #[test] - fn test_condvar_broadcast() { - smol::block_on(async { - let tasks = 30; - - let queue = Arc::new(Mutex::new(Queue::new(5))); - let condvar = Arc::new(CondVar::new()); - - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer1 = smol::spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } - - queue.items.push_back(format!("producer1: task {i}")); - - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); - } - }); - - let queue_cloned = queue.clone(); - let condvar_cloned = condvar.clone(); - let _producer2 = smol::spawn(async move { - for i in 1..tasks { - // Lock queue mtuex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-full - while queue.is_full() { - // Release queue mutex and sleep - queue = condvar_cloned.wait(queue).await; - } - - queue.items.push_back(format!("producer2: task {i}")); - - // Wake up all producer and consumer tasks - condvar_cloned.broadcast(); - } - }); - - let queue_cloned = queue.clone(); - let task_consumed = Arc::new(AtomicUsize::new(0)); - let task_consumed_ = task_consumed.clone(); - - let consumer = smol::spawn(async move { - for _ in 1..((tasks * 2) - 1) { - { - // Lock queue mutex - let mut queue = queue_cloned.lock().await; - - // Check if the queue is non-empty - while queue.is_empty() { - // Release queue mutex and sleep - queue = condvar.wait(queue).await; - } - - let _ = queue.items.pop_front().unwrap(); - - task_consumed_.fetch_add(1, Ordering::Relaxed); - - // Do something - - // Wake up all producer and consumer tasks - condvar.broadcast(); - } - } - }); - - consumer.await; - assert!(queue.lock().await.is_empty()); - assert_eq!(task_consumed.load(Ordering::Relaxed), 58); - }); - } -} diff --git a/karyons_core/src/async_utils/condwait.rs b/karyons_core/src/async_utils/condwait.rs deleted file mode 100644 index f16a99e..0000000 --- a/karyons_core/src/async_utils/condwait.rs +++ /dev/null @@ -1,96 +0,0 @@ -use smol::lock::Mutex; - -use super::CondVar; - -/// CondWait is a wrapper struct for CondVar with a Mutex boolean flag. -/// -/// # Example -/// -///``` -/// use std::sync::Arc; -/// -/// use karyons_core::async_utils::CondWait; -/// -/// async { -/// let cond_wait = Arc::new(CondWait::new()); -/// let cond_wait_cloned = cond_wait.clone(); -/// let task = smol::spawn(async move { -/// cond_wait_cloned.wait().await; -/// // ... -/// }); -/// -/// cond_wait.signal().await; -/// }; -/// -/// ``` -/// -pub struct CondWait { - /// The CondVar - condvar: CondVar, - /// Boolean flag - w: Mutex<bool>, -} - -impl CondWait { - /// Creates a new CondWait. - pub fn new() -> Self { - Self { - condvar: CondVar::new(), - w: Mutex::new(false), - } - } - - /// Waits for a signal or broadcast. - pub async fn wait(&self) { - let mut w = self.w.lock().await; - - // While the boolean flag is false, wait for a signal. - while !*w { - w = self.condvar.wait(w).await; - } - } - - /// Signal a waiting task. - pub async fn signal(&self) { - *self.w.lock().await = true; - self.condvar.signal(); - } - - /// Signal all waiting tasks. - pub async fn broadcast(&self) { - *self.w.lock().await = true; - self.condvar.broadcast(); - } - - /// Reset the boolean flag value to false. - pub async fn reset(&self) { - *self.w.lock().await = false; - } -} - -impl Default for CondWait { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Arc; - - #[test] - fn test_cond_wait() { - smol::block_on(async { - let cond_wait = Arc::new(CondWait::new()); - let cond_wait_cloned = cond_wait.clone(); - let task = smol::spawn(async move { - cond_wait_cloned.wait().await; - true - }); - - cond_wait.signal().await; - assert!(task.await); - }); - } -} diff --git a/karyons_core/src/async_utils/mod.rs b/karyons_core/src/async_utils/mod.rs deleted file mode 100644 index c871bad..0000000 --- a/karyons_core/src/async_utils/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod backoff; -mod condvar; -mod condwait; -mod select; -mod task_group; -mod timeout; - -pub use backoff::Backoff; -pub use condvar::CondVar; -pub use condwait::CondWait; -pub use select::{select, Either}; -pub use task_group::{TaskGroup, TaskResult}; -pub use timeout::timeout; diff --git a/karyons_core/src/async_utils/select.rs b/karyons_core/src/async_utils/select.rs deleted file mode 100644 index d61b355..0000000 --- a/karyons_core/src/async_utils/select.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project_lite::pin_project; -use smol::future::Future; - -/// Returns the result of the future that completes first, preferring future1 -/// if both are ready. -/// -/// # Examples -/// -/// ``` -/// use std::future; -/// -/// use karyons_core::async_utils::{select, Either}; -/// -/// async { -/// let fut1 = future::pending::<String>(); -/// let fut2 = future::ready(0); -/// let res = select(fut1, fut2).await; -/// assert!(matches!(res, Either::Right(0))); -/// // .... -/// }; -/// -/// ``` -/// -pub fn select<T1, T2, F1, F2>(future1: F1, future2: F2) -> Select<F1, F2> -where - F1: Future<Output = T1>, - F2: Future<Output = T2>, -{ - Select { future1, future2 } -} - -pin_project! { - #[derive(Debug)] - pub struct Select<F1, F2> { - #[pin] - future1: F1, - #[pin] - future2: F2, - } -} - -/// The return value from the `select` function, indicating which future -/// completed first. -#[derive(Debug)] -pub enum Either<T1, T2> { - Left(T1), - Right(T2), -} - -// Implement the Future trait for the Select struct. -impl<T1, T2, F1, F2> Future for Select<F1, F2> -where - F1: Future<Output = T1>, - F2: Future<Output = T2>, -{ - type Output = Either<T1, T2>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let this = self.project(); - - if let Poll::Ready(t) = this.future1.poll(cx) { - return Poll::Ready(Either::Left(t)); - } - - if let Poll::Ready(t) = this.future2.poll(cx) { - return Poll::Ready(Either::Right(t)); - } - - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::{select, Either}; - use smol::Timer; - use std::future; - - #[test] - fn test_async_select() { - smol::block_on(async move { - let fut = select(Timer::never(), future::ready(0 as u32)).await; - assert!(matches!(fut, Either::Right(0))); - - let fut1 = future::pending::<String>(); - let fut2 = future::ready(0); - let res = select(fut1, fut2).await; - assert!(matches!(res, Either::Right(0))); - - let fut1 = future::ready(0); - let fut2 = future::pending::<String>(); - let res = select(fut1, fut2).await; - assert!(matches!(res, Either::Left(_))); - }); - } -} diff --git a/karyons_core/src/async_utils/task_group.rs b/karyons_core/src/async_utils/task_group.rs deleted file mode 100644 index 8707d0e..0000000 --- a/karyons_core/src/async_utils/task_group.rs +++ /dev/null @@ -1,197 +0,0 @@ -use std::{future::Future, sync::Arc, sync::Mutex}; - -use smol::Task; - -use crate::Executor; - -use super::{select, CondWait, Either}; - -/// TaskGroup is a group of spawned tasks. -/// -/// # Example -/// -/// ``` -/// -/// use std::sync::Arc; -/// -/// use karyons_core::async_utils::TaskGroup; -/// -/// async { -/// -/// let ex = Arc::new(smol::Executor::new()); -/// let group = TaskGroup::new(); -/// -/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {}); -/// -/// group.cancel().await; -/// -/// }; -/// -/// ``` -/// -pub struct TaskGroup { - tasks: Mutex<Vec<TaskHandler>>, - stop_signal: Arc<CondWait>, -} - -impl<'a> TaskGroup { - /// Creates a new task group - pub fn new() -> Self { - Self { - tasks: Mutex::new(Vec::new()), - stop_signal: Arc::new(CondWait::new()), - } - } - - /// Spawns a new task and calls the callback after it has completed - /// or been canceled. The callback will have the `TaskResult` as a - /// parameter, indicating whether the task completed or was canceled. - pub fn spawn<T, Fut, CallbackF, CallbackFut>( - &self, - executor: Executor<'a>, - fut: Fut, - callback: CallbackF, - ) where - T: Send + Sync + 'a, - Fut: Future<Output = T> + Send + 'a, - CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, - CallbackFut: Future<Output = ()> + Send + 'a, - { - let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone()); - self.tasks.lock().unwrap().push(task); - } - - /// Checks if the task group is empty. - pub fn is_empty(&self) -> bool { - self.tasks.lock().unwrap().is_empty() - } - - /// Get the number of the tasks in the group. - pub fn len(&self) -> usize { - self.tasks.lock().unwrap().len() - } - - /// Cancels all tasks in the group. - pub async fn cancel(&self) { - self.stop_signal.broadcast().await; - - loop { - let task = self.tasks.lock().unwrap().pop(); - if let Some(t) = task { - t.cancel().await - } else { - break; - } - } - } -} - -impl Default for TaskGroup { - fn default() -> Self { - Self::new() - } -} - -/// The result of a spawned task. -#[derive(Debug)] -pub enum TaskResult<T> { - Completed(T), - Cancelled, -} - -impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - TaskResult::Cancelled => write!(f, "Task cancelled"), - TaskResult::Completed(res) => write!(f, "Task completed: {:?}", res), - } - } -} - -/// TaskHandler -pub struct TaskHandler { - task: Task<()>, - cancel_flag: Arc<CondWait>, -} - -impl<'a> TaskHandler { - /// Creates a new task handle - fn new<T, Fut, CallbackF, CallbackFut>( - ex: Executor<'a>, - fut: Fut, - callback: CallbackF, - stop_signal: Arc<CondWait>, - ) -> TaskHandler - where - T: Send + Sync + 'a, - Fut: Future<Output = T> + Send + 'a, - CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, - CallbackFut: Future<Output = ()> + Send + 'a, - { - let cancel_flag = Arc::new(CondWait::new()); - let cancel_flag_c = cancel_flag.clone(); - let task = ex.spawn(async move { - //start_signal.signal().await; - // Waits for either the stop signal or the task to complete. - let result = select(stop_signal.wait(), fut).await; - - let result = match result { - Either::Left(_) => TaskResult::Cancelled, - Either::Right(res) => TaskResult::Completed(res), - }; - - // Call the callback with the result. - callback(result).await; - - cancel_flag_c.signal().await; - }); - - TaskHandler { task, cancel_flag } - } - - /// Cancels the task. - async fn cancel(self) { - self.cancel_flag.wait().await; - self.task.cancel().await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::{future, sync::Arc}; - - #[test] - fn test_task_group() { - let ex = Arc::new(smol::Executor::new()); - smol::block_on(ex.clone().run(async move { - let group = Arc::new(TaskGroup::new()); - - group.spawn(ex.clone(), future::ready(0), |res| async move { - assert!(matches!(res, TaskResult::Completed(0))); - }); - - group.spawn(ex.clone(), future::pending::<()>(), |res| async move { - assert!(matches!(res, TaskResult::Cancelled)); - }); - - let groupc = group.clone(); - let exc = ex.clone(); - group.spawn( - ex.clone(), - async move { - groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move { - assert!(matches!(res, TaskResult::Cancelled)); - }); - }, - |res| async move { - assert!(matches!(res, TaskResult::Completed(_))); - }, - ); - - // Do something - smol::Timer::after(std::time::Duration::from_millis(50)).await; - group.cancel().await; - })); - } -} diff --git a/karyons_core/src/async_utils/timeout.rs b/karyons_core/src/async_utils/timeout.rs deleted file mode 100644 index 7c55e1b..0000000 --- a/karyons_core/src/async_utils/timeout.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::{future::Future, time::Duration}; - -use smol::Timer; - -use super::{select, Either}; -use crate::{error::Error, Result}; - -/// Waits for a future to complete or times out if it exceeds a specified -/// duration. -/// -/// # Example -/// -/// ``` -/// use std::{future, time::Duration}; -/// -/// use karyons_core::async_utils::timeout; -/// -/// async { -/// let fut = future::pending::<()>(); -/// assert!(timeout(Duration::from_millis(100), fut).await.is_err()); -/// }; -/// -/// ``` -/// -pub async fn timeout<T, F>(delay: Duration, future1: F) -> Result<T> -where - F: Future<Output = T>, -{ - let result = select(Timer::after(delay), future1).await; - - match result { - Either::Left(_) => Err(Error::Timeout), - Either::Right(res) => Ok(res), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::{future, time::Duration}; - - #[test] - fn test_timeout() { - smol::block_on(async move { - let fut = future::pending::<()>(); - assert!(timeout(Duration::from_millis(10), fut).await.is_err()); - - let fut = smol::Timer::after(Duration::from_millis(10)); - assert!(timeout(Duration::from_millis(50), fut).await.is_ok()) - }); - } -} diff --git a/karyons_core/src/error.rs b/karyons_core/src/error.rs deleted file mode 100644 index 15947c8..0000000 --- a/karyons_core/src/error.rs +++ /dev/null @@ -1,51 +0,0 @@ -use thiserror::Error as ThisError; - -pub type Result<T> = std::result::Result<T, Error>; - -#[derive(ThisError, Debug)] -pub enum Error { - #[error("IO Error: {0}")] - IO(#[from] std::io::Error), - - #[error("Timeout Error")] - Timeout, - - #[error("Path Not Found Error: {0}")] - PathNotFound(&'static str), - - #[error("Channel Send Error: {0}")] - ChannelSend(String), - - #[error("Channel Receive Error: {0}")] - ChannelRecv(String), - - #[error("Decode Error: {0}")] - Decode(String), - - #[error("Encode Error: {0}")] - Encode(String), -} - -impl<T> From<smol::channel::SendError<T>> for Error { - fn from(error: smol::channel::SendError<T>) -> Self { - Error::ChannelSend(error.to_string()) - } -} - -impl From<smol::channel::RecvError> for Error { - fn from(error: smol::channel::RecvError) -> Self { - Error::ChannelRecv(error.to_string()) - } -} - -impl From<bincode::error::DecodeError> for Error { - fn from(error: bincode::error::DecodeError) -> Self { - Error::Decode(error.to_string()) - } -} - -impl From<bincode::error::EncodeError> for Error { - fn from(error: bincode::error::EncodeError) -> Self { - Error::Encode(error.to_string()) - } -} diff --git a/karyons_core/src/event.rs b/karyons_core/src/event.rs deleted file mode 100644 index b856385..0000000 --- a/karyons_core/src/event.rs +++ /dev/null @@ -1,451 +0,0 @@ -use std::{ - any::Any, - collections::HashMap, - marker::PhantomData, - sync::{Arc, Weak}, -}; - -use chrono::{DateTime, Utc}; -use log::{error, trace}; -use smol::{ - channel::{Receiver, Sender}, - lock::Mutex, -}; - -use crate::{utils::random_16, Result}; - -pub type ArcEventSys<T> = Arc<EventSys<T>>; -pub type WeakEventSys<T> = Weak<EventSys<T>>; -pub type EventListenerID = u16; - -type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>; - -/// EventSys supports event emission to registered listeners based on topics. -/// # Example -/// -/// ``` -/// use karyons_core::event::{EventSys, EventValueTopic, EventValue}; -/// -/// async { -/// let event_sys = EventSys::new(); -/// -/// #[derive(Hash, PartialEq, Eq, Debug, Clone)] -/// enum Topic { -/// TopicA, -/// TopicB, -/// } -/// -/// #[derive(Clone, Debug, PartialEq)] -/// struct A(usize); -/// -/// impl EventValue for A { -/// fn id() -> &'static str { -/// "A" -/// } -/// } -/// -/// let listener = event_sys.register::<A>(&Topic::TopicA).await; -/// -/// event_sys.emit_by_topic(&Topic::TopicA, &A(3)) .await; -/// let msg: A = listener.recv().await.unwrap(); -/// -/// #[derive(Clone, Debug, PartialEq)] -/// struct B(usize); -/// -/// impl EventValue for B { -/// fn id() -> &'static str { -/// "B" -/// } -/// } -/// -/// impl EventValueTopic for B { -/// type Topic = Topic; -/// fn topic() -> Self::Topic{ -/// Topic::TopicB -/// } -/// } -/// -/// let listener = event_sys.register::<B>(&Topic::TopicB).await; -/// -/// event_sys.emit(&B(3)) .await; -/// let msg: B = listener.recv().await.unwrap(); -/// -/// // .... -/// }; -/// -/// ``` -/// -pub struct EventSys<T> { - listeners: Mutex<Listeners<T>>, -} - -impl<T> EventSys<T> -where - T: std::hash::Hash + Eq + std::fmt::Debug + Clone, -{ - /// Creates a new `EventSys` - pub fn new() -> ArcEventSys<T> { - Arc::new(Self { - listeners: Mutex::new(HashMap::new()), - }) - } - - /// Emits an event to the listeners. - /// - /// The event must implement the `EventValueTopic` trait to indicate the - /// topic of the event. Otherwise, you can use `emit_by_topic()`. - pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) { - let topic = E::topic(); - self.emit_by_topic(&topic, value).await; - } - - /// Emits an event to the listeners. - pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) { - let value: Arc<dyn EventValueAny> = Arc::new(value.clone()); - let event = Event::new(value); - - let mut topics = self.listeners.lock().await; - - if !topics.contains_key(topic) { - error!("Failed to emit an event to a non-existent topic"); - return; - } - - let event_ids = topics.get_mut(topic).unwrap(); - let event_id = E::id().to_string(); - - if !event_ids.contains_key(&event_id) { - error!("Failed to emit an event to a non-existent event id"); - return; - } - - let mut failed_listeners = vec![]; - - let listeners = event_ids.get_mut(&event_id).unwrap(); - for (listener_id, listener) in listeners.iter() { - if let Err(err) = listener.send(event.clone()).await { - trace!("Failed to emit event for topic {:?}: {}", topic, err); - failed_listeners.push(*listener_id); - } - } - - for listener_id in failed_listeners.iter() { - listeners.remove(listener_id); - } - } - - /// Registers a new event listener for the given topic. - pub async fn register<E: EventValueAny + EventValue + Clone>( - self: &Arc<Self>, - topic: &T, - ) -> EventListener<T, E> { - let chan = smol::channel::unbounded(); - - let topics = &mut self.listeners.lock().await; - - if !topics.contains_key(topic) { - topics.insert(topic.clone(), HashMap::new()); - } - - let event_ids = topics.get_mut(topic).unwrap(); - let event_id = E::id().to_string(); - - if !event_ids.contains_key(&event_id) { - event_ids.insert(event_id.clone(), HashMap::new()); - } - - let listeners = event_ids.get_mut(&event_id).unwrap(); - - let mut listener_id = random_16(); - while listeners.contains_key(&listener_id) { - listener_id = random_16(); - } - - let listener = - EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic); - - listeners.insert(listener_id, chan.0); - - listener - } - - /// Removes an event listener attached to the given topic. - async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) { - let topics = &mut self.listeners.lock().await; - if !topics.contains_key(topic) { - error!("Failed to remove a non-existent topic"); - return; - } - - let event_ids = topics.get_mut(topic).unwrap(); - if !event_ids.contains_key(event_id) { - error!("Failed to remove a non-existent event id"); - return; - } - - let listeners = event_ids.get_mut(event_id).unwrap(); - if listeners.remove(listener_id).is_none() { - error!("Failed to remove a non-existent event listener"); - } - } -} - -/// EventListener listens for and receives events from the `EventSys`. -pub struct EventListener<T, E> { - id: EventListenerID, - recv_chan: Receiver<Event>, - event_sys: WeakEventSys<T>, - event_id: String, - topic: T, - phantom: PhantomData<E>, -} - -impl<T, E> EventListener<T, E> -where - T: std::hash::Hash + Eq + Clone + std::fmt::Debug, - E: EventValueAny + Clone + EventValue, -{ - /// Create a new event listener. - fn new( - id: EventListenerID, - event_sys: WeakEventSys<T>, - recv_chan: Receiver<Event>, - event_id: &str, - topic: &T, - ) -> EventListener<T, E> { - Self { - id, - recv_chan, - event_sys, - event_id: event_id.to_string(), - topic: topic.clone(), - phantom: PhantomData, - } - } - - /// Receive the next event. - pub async fn recv(&self) -> Result<E> { - match self.recv_chan.recv().await { - Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() { - Some(v) => Ok(v.clone()), - None => unreachable!("Error when attempting to downcast the event value."), - }, - Err(err) => { - error!("Failed to receive new event: {err}"); - self.cancel().await; - Err(err.into()) - } - } - } - - /// Cancels the listener and removes it from the `EventSys`. - pub async fn cancel(&self) { - self.event_sys() - .remove(&self.topic, &self.event_id, &self.id) - .await; - } - - /// Returns the topic for this event listener. - pub async fn topic(&self) -> &T { - &self.topic - } - - /// Returns the event id for this event listener. - pub async fn event_id(&self) -> &String { - &self.event_id - } - - fn event_sys(&self) -> ArcEventSys<T> { - self.event_sys.upgrade().unwrap() - } -} - -/// An event within the `EventSys`. -#[derive(Clone, Debug)] -pub struct Event { - /// The time at which the event was created. - created_at: DateTime<Utc>, - /// The value of the Event. - value: Arc<dyn EventValueAny>, -} - -impl Event { - /// Creates a new Event. - pub fn new(value: Arc<dyn EventValueAny>) -> Self { - Self { - created_at: Utc::now(), - value, - } - } -} - -impl std::fmt::Display for Event { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}: {:?}", self.created_at, self.value) - } -} - -pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug { - fn value_as_any(&self) -> &dyn Any; -} - -impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T { - fn value_as_any(&self) -> &dyn Any { - self - } -} - -pub trait EventValue: EventValueAny { - fn id() -> &'static str - where - Self: Sized; -} - -pub trait EventValueTopic: EventValueAny + EventValue { - type Topic; - fn topic() -> Self::Topic - where - Self: Sized; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Hash, PartialEq, Eq, Debug, Clone)] - enum Topic { - TopicA, - TopicB, - TopicC, - TopicD, - TopicE, - } - - #[derive(Clone, Debug, PartialEq)] - struct A { - a_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct B { - b_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct C { - c_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct D { - d_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct E { - e_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct F { - f_value: usize, - } - - impl EventValue for A { - fn id() -> &'static str { - "A" - } - } - - impl EventValue for B { - fn id() -> &'static str { - "B" - } - } - - impl EventValue for C { - fn id() -> &'static str { - "C" - } - } - - impl EventValue for D { - fn id() -> &'static str { - "D" - } - } - - impl EventValue for E { - fn id() -> &'static str { - "E" - } - } - - impl EventValue for F { - fn id() -> &'static str { - "F" - } - } - - impl EventValueTopic for C { - type Topic = Topic; - fn topic() -> Self::Topic { - Topic::TopicC - } - } - - #[test] - fn test_event_sys() { - smol::block_on(async move { - let event_sys = EventSys::<Topic>::new(); - - let a_listener = event_sys.register::<A>(&Topic::TopicA).await; - let b_listener = event_sys.register::<B>(&Topic::TopicB).await; - - event_sys - .emit_by_topic(&Topic::TopicA, &A { a_value: 3 }) - .await; - event_sys - .emit_by_topic(&Topic::TopicB, &B { b_value: 5 }) - .await; - - let msg = a_listener.recv().await.unwrap(); - assert_eq!(msg, A { a_value: 3 }); - - let msg = b_listener.recv().await.unwrap(); - assert_eq!(msg, B { b_value: 5 }); - - // register the same event type to different topics - let c_listener = event_sys.register::<C>(&Topic::TopicC).await; - let d_listener = event_sys.register::<C>(&Topic::TopicD).await; - - event_sys.emit(&C { c_value: 10 }).await; - let msg = c_listener.recv().await.unwrap(); - assert_eq!(msg, C { c_value: 10 }); - - event_sys - .emit_by_topic(&Topic::TopicD, &C { c_value: 10 }) - .await; - let msg = d_listener.recv().await.unwrap(); - assert_eq!(msg, C { c_value: 10 }); - - // register different event types to the same topic - let e_listener = event_sys.register::<E>(&Topic::TopicE).await; - let f_listener = event_sys.register::<F>(&Topic::TopicE).await; - - event_sys - .emit_by_topic(&Topic::TopicE, &E { e_value: 5 }) - .await; - - let msg = e_listener.recv().await.unwrap(); - assert_eq!(msg, E { e_value: 5 }); - - event_sys - .emit_by_topic(&Topic::TopicE, &F { f_value: 5 }) - .await; - - let msg = f_listener.recv().await.unwrap(); - assert_eq!(msg, F { f_value: 5 }); - }); - } -} diff --git a/karyons_core/src/lib.rs b/karyons_core/src/lib.rs deleted file mode 100644 index 83af888..0000000 --- a/karyons_core/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -/// A set of helper tools and functions. -pub mod utils; - -/// A module containing async utilities that work with the `smol` async runtime. -pub mod async_utils; - -/// Represents Karyons's Core Error. -pub mod error; - -/// [`EventSys`](./event/struct.EventSys.html) Implementation -pub mod event; - -/// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html) -pub mod pubsub; - -use error::Result; -use smol::Executor as SmolEx; -use std::sync::Arc; - -/// A wrapper for smol::Executor -pub type Executor<'a> = Arc<SmolEx<'a>>; diff --git a/karyons_core/src/pubsub.rs b/karyons_core/src/pubsub.rs deleted file mode 100644 index 4cc0ab7..0000000 --- a/karyons_core/src/pubsub.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use log::error; -use smol::lock::Mutex; - -use crate::{utils::random_16, Result}; - -pub type ArcPublisher<T> = Arc<Publisher<T>>; -pub type SubscriptionID = u16; - -/// A simple publish-subscribe system. -// # Example -/// -/// ``` -/// use karyons_core::pubsub::{Publisher}; -/// -/// async { -/// let publisher = Publisher::new(); -/// -/// let sub = publisher.subscribe().await; -/// -/// publisher.notify(&String::from("MESSAGE")).await; -/// -/// let msg = sub.recv().await; -/// -/// // .... -/// }; -/// -/// ``` -pub struct Publisher<T> { - subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>, -} - -impl<T: Clone> Publisher<T> { - /// Creates a new Publisher - pub fn new() -> ArcPublisher<T> { - Arc::new(Self { - subs: Mutex::new(HashMap::new()), - }) - } - - /// Subscribe and return a Subscription - pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> { - let mut subs = self.subs.lock().await; - - let chan = smol::channel::unbounded(); - - let mut sub_id = random_16(); - - // While the SubscriptionID already exists, generate a new one - while subs.contains_key(&sub_id) { - sub_id = random_16(); - } - - let sub = Subscription::new(sub_id, self.clone(), chan.1); - subs.insert(sub_id, chan.0); - - sub - } - - /// Unsubscribe from the Publisher - pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) { - self.subs.lock().await.remove(id); - } - - /// Notify all subscribers - pub async fn notify(self: &Arc<Self>, value: &T) { - let mut subs = self.subs.lock().await; - let mut closed_subs = vec![]; - - for (sub_id, sub) in subs.iter() { - if let Err(err) = sub.send(value.clone()).await { - error!("failed to notify {}: {}", sub_id, err); - closed_subs.push(*sub_id); - } - } - - for sub_id in closed_subs.iter() { - subs.remove(sub_id); - } - } -} - -// Subscription -pub struct Subscription<T> { - id: SubscriptionID, - recv_chan: smol::channel::Receiver<T>, - publisher: ArcPublisher<T>, -} - -impl<T: Clone> Subscription<T> { - /// Creates a new Subscription - pub fn new( - id: SubscriptionID, - publisher: ArcPublisher<T>, - recv_chan: smol::channel::Receiver<T>, - ) -> Subscription<T> { - Self { - id, - recv_chan, - publisher, - } - } - - /// Receive a message from the Publisher - pub async fn recv(&self) -> Result<T> { - let msg = self.recv_chan.recv().await?; - Ok(msg) - } - - /// Unsubscribe from the Publisher - pub async fn unsubscribe(&self) { - self.publisher.unsubscribe(&self.id).await; - } -} diff --git a/karyons_core/src/utils/decode.rs b/karyons_core/src/utils/decode.rs deleted file mode 100644 index a8a6522..0000000 --- a/karyons_core/src/utils/decode.rs +++ /dev/null @@ -1,10 +0,0 @@ -use bincode::Decode; - -use crate::Result; - -/// Decodes a given type `T` from the given slice. returns the decoded value -/// along with the number of bytes read. -pub fn decode<T: Decode>(src: &[u8]) -> Result<(T, usize)> { - let (result, bytes_read) = bincode::decode_from_slice(src, bincode::config::standard())?; - Ok((result, bytes_read)) -} diff --git a/karyons_core/src/utils/encode.rs b/karyons_core/src/utils/encode.rs deleted file mode 100644 index 7d1061b..0000000 --- a/karyons_core/src/utils/encode.rs +++ /dev/null @@ -1,15 +0,0 @@ -use bincode::Encode; - -use crate::Result; - -/// Encode the given type `T` into a `Vec<u8>`. -pub fn encode<T: Encode>(msg: &T) -> Result<Vec<u8>> { - let vec = bincode::encode_to_vec(msg, bincode::config::standard())?; - Ok(vec) -} - -/// Encode the given type `T` into the given slice.. -pub fn encode_into_slice<T: Encode>(msg: &T, dst: &mut [u8]) -> Result<()> { - bincode::encode_into_slice(msg, dst, bincode::config::standard())?; - Ok(()) -} diff --git a/karyons_core/src/utils/mod.rs b/karyons_core/src/utils/mod.rs deleted file mode 100644 index a3c3f50..0000000 --- a/karyons_core/src/utils/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -mod decode; -mod encode; -mod path; - -pub use decode::decode; -pub use encode::{encode, encode_into_slice}; -pub use path::{home_dir, tilde_expand}; - -use rand::{rngs::OsRng, Rng}; - -/// Generates and returns a random u32 using `rand::rngs::OsRng`. -pub fn random_32() -> u32 { - OsRng.gen() -} - -/// Generates and returns a random u16 using `rand::rngs::OsRng`. -pub fn random_16() -> u16 { - OsRng.gen() -} diff --git a/karyons_core/src/utils/path.rs b/karyons_core/src/utils/path.rs deleted file mode 100644 index 2cd900a..0000000 --- a/karyons_core/src/utils/path.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::Error, Result}; - -/// Returns the user's home directory as a `PathBuf`. -#[allow(dead_code)] -pub fn home_dir() -> Result<PathBuf> { - dirs::home_dir().ok_or(Error::PathNotFound("Home dir not found")) -} - -/// Expands a tilde (~) in a path and returns the expanded `PathBuf`. -#[allow(dead_code)] -pub fn tilde_expand(path: &str) -> Result<PathBuf> { - match path { - "~" => home_dir(), - p if p.starts_with("~/") => Ok(home_dir()?.join(&path[2..])), - _ => Ok(PathBuf::from(path)), - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_tilde_expand() { - let path = "~/src"; - let expanded_path = dirs::home_dir().unwrap().join("src"); - assert_eq!(tilde_expand(path).unwrap(), expanded_path); - - let path = "~"; - let expanded_path = dirs::home_dir().unwrap(); - assert_eq!(tilde_expand(path).unwrap(), expanded_path); - - let path = ""; - let expanded_path = PathBuf::from(""); - assert_eq!(tilde_expand(path).unwrap(), expanded_path); - } -} |