diff options
Diffstat (limited to 'jsonrpc/src/server/response_queue.rs')
-rw-r--r-- | jsonrpc/src/server/response_queue.rs | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs new file mode 100644 index 0000000..0d70503 --- /dev/null +++ b/jsonrpc/src/server/response_queue.rs @@ -0,0 +1,40 @@ +use std::{collections::VecDeque, sync::Arc}; + +use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; + +/// A queue for handling responses +pub(super) struct ResponseQueue<T> { + queue: Mutex<VecDeque<T>>, + condvar: CondVar, +} + +impl<T: std::fmt::Debug> ResponseQueue<T> { + pub(super) fn new() -> Arc<Self> { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + pub(super) async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + pub(super) async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.signal(); + } +} |