From 98a1de91a2dae06323558422c239e5a45fc86e7b Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 Nov 2023 22:41:33 +0300 Subject: implement TLS for inbound and outbound connections --- core/src/async_util/condwait.rs | 133 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 core/src/async_util/condwait.rs (limited to 'core/src/async_util/condwait.rs') diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs new file mode 100644 index 0000000..cd4b269 --- /dev/null +++ b/core/src/async_util/condwait.rs @@ -0,0 +1,133 @@ +use smol::lock::Mutex; + +use super::CondVar; + +/// CondWait is a wrapper struct for CondVar with a Mutex boolean flag. +/// +/// # Example +/// +///``` +/// use std::sync::Arc; +/// +/// use karyons_core::async_util::CondWait; +/// +/// async { +/// let cond_wait = Arc::new(CondWait::new()); +/// let cond_wait_cloned = cond_wait.clone(); +/// let task = smol::spawn(async move { +/// cond_wait_cloned.wait().await; +/// // ... +/// }); +/// +/// cond_wait.signal().await; +/// }; +/// +/// ``` +/// +pub struct CondWait { + /// The CondVar + condvar: CondVar, + /// Boolean flag + w: Mutex, +} + +impl CondWait { + /// Creates a new CondWait. + pub fn new() -> Self { + Self { + condvar: CondVar::new(), + w: Mutex::new(false), + } + } + + /// Waits for a signal or broadcast. + pub async fn wait(&self) { + let mut w = self.w.lock().await; + + // While the boolean flag is false, wait for a signal. + while !*w { + w = self.condvar.wait(w).await; + } + } + + /// Signal a waiting task. + pub async fn signal(&self) { + *self.w.lock().await = true; + self.condvar.signal(); + } + + /// Signal all waiting tasks. + pub async fn broadcast(&self) { + *self.w.lock().await = true; + self.condvar.broadcast(); + } + + /// Reset the boolean flag value to false. + pub async fn reset(&self) { + *self.w.lock().await = false; + } +} + +impl Default for CondWait { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + #[test] + fn test_cond_wait() { + smol::block_on(async { + let cond_wait = Arc::new(CondWait::new()); + let count = Arc::new(AtomicUsize::new(0)); + + let cond_wait_cloned = cond_wait.clone(); + let count_cloned = count.clone(); + let task = smol::spawn(async move { + cond_wait_cloned.wait().await; + count_cloned.fetch_add(1, Ordering::Relaxed); + // do something + }); + + // Send a signal to the waiting task + cond_wait.signal().await; + + task.await; + + // Reset the boolean flag + cond_wait.reset().await; + + assert_eq!(count.load(Ordering::Relaxed), 1); + + let cond_wait_cloned = cond_wait.clone(); + let count_cloned = count.clone(); + let task1 = smol::spawn(async move { + cond_wait_cloned.wait().await; + count_cloned.fetch_add(1, Ordering::Relaxed); + // do something + }); + + let cond_wait_cloned = cond_wait.clone(); + let count_cloned = count.clone(); + let task2 = smol::spawn(async move { + cond_wait_cloned.wait().await; + count_cloned.fetch_add(1, Ordering::Relaxed); + // do something + }); + + // Broadcast a signal to all waiting tasks + cond_wait.broadcast().await; + + task1.await; + task2.await; + assert_eq!(count.load(Ordering::Relaxed), 3); + }); + } +} -- cgit v1.2.3