aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/response_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server/response_queue.rs')
-rw-r--r--jsonrpc/src/server/response_queue.rs40
1 files changed, 40 insertions, 0 deletions
diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs
new file mode 100644
index 0000000..0d70503
--- /dev/null
+++ b/jsonrpc/src/server/response_queue.rs
@@ -0,0 +1,40 @@
+use std::{collections::VecDeque, sync::Arc};
+
+use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
+
+/// A queue for handling responses
+pub(super) struct ResponseQueue<T> {
+ queue: Mutex<VecDeque<T>>,
+ condvar: CondVar,
+}
+
+impl<T: std::fmt::Debug> ResponseQueue<T> {
+ pub(super) fn new() -> Arc<Self> {
+ Arc::new(Self {
+ queue: Mutex::new(VecDeque::new()),
+ condvar: CondVar::new(),
+ })
+ }
+
+ /// Wait while the queue is empty, remove and return the item from the queue,
+ /// panicking if empty (shouldn't happen)
+ pub(super) async fn recv(&self) -> T {
+ let mut queue = self.queue.lock().await;
+
+ while queue.is_empty() {
+ queue = self.condvar.wait(queue).await;
+ }
+
+ match queue.pop_front() {
+ Some(v) => v,
+ None => unreachable!(),
+ }
+ }
+
+ /// Push an item into the queue, notify all waiting tasks that the
+ /// condvar has changed
+ pub(super) async fn push(&self, res: T) {
+ self.queue.lock().await.push_back(res);
+ self.condvar.signal();
+ }
+}