diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-13 05:52:48 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-13 05:52:48 +0200 |
commit | 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 (patch) | |
tree | e9a34bea9e6fd45d53a4ad1a7a4e75857ad2fe9a /jsonrpc/src/server/response_queue.rs | |
parent | d6a280f69a6685d5b4da5366626fb76a27f0cc07 (diff) |
jsonrpc: spread out comments and clean up
Diffstat (limited to 'jsonrpc/src/server/response_queue.rs')
-rw-r--r-- | jsonrpc/src/server/response_queue.rs | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs new file mode 100644 index 0000000..0d70503 --- /dev/null +++ b/jsonrpc/src/server/response_queue.rs @@ -0,0 +1,40 @@ +use std::{collections::VecDeque, sync::Arc}; + +use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; + +/// A queue for handling responses +pub(super) struct ResponseQueue<T> { + queue: Mutex<VecDeque<T>>, + condvar: CondVar, +} + +impl<T: std::fmt::Debug> ResponseQueue<T> { + pub(super) fn new() -> Arc<Self> { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + pub(super) async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + pub(super) async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.signal(); + } +} |