blob: 0d7050360cc08202fef4609c599e803d4a5be8af (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
use std::{collections::VecDeque, sync::Arc};
use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
/// A queue for handling responses
pub(super) struct ResponseQueue<T> {
queue: Mutex<VecDeque<T>>,
condvar: CondVar,
}
impl<T: std::fmt::Debug> ResponseQueue<T> {
pub(super) fn new() -> Arc<Self> {
Arc::new(Self {
queue: Mutex::new(VecDeque::new()),
condvar: CondVar::new(),
})
}
/// Wait while the queue is empty, remove and return the item from the queue,
/// panicking if empty (shouldn't happen)
pub(super) async fn recv(&self) -> T {
let mut queue = self.queue.lock().await;
while queue.is_empty() {
queue = self.condvar.wait(queue).await;
}
match queue.pop_front() {
Some(v) => v,
None => unreachable!(),
}
}
/// Push an item into the queue, notify all waiting tasks that the
/// condvar has changed
pub(super) async fn push(&self, res: T) {
self.queue.lock().await.push_back(res);
self.condvar.signal();
}
}
|