From 1c27f751c30196e2c421ae420dacbc4ff25f0fc7 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 13 Jun 2024 05:52:48 +0200 Subject: jsonrpc: spread out comments and clean up --- jsonrpc/src/server/response_queue.rs | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 jsonrpc/src/server/response_queue.rs (limited to 'jsonrpc/src/server/response_queue.rs') diff --git a/jsonrpc/src/server/response_queue.rs b/jsonrpc/src/server/response_queue.rs new file mode 100644 index 0000000..0d70503 --- /dev/null +++ b/jsonrpc/src/server/response_queue.rs @@ -0,0 +1,40 @@ +use std::{collections::VecDeque, sync::Arc}; + +use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; + +/// A queue for handling responses +pub(super) struct ResponseQueue { + queue: Mutex>, + condvar: CondVar, +} + +impl ResponseQueue { + pub(super) fn new() -> Arc { + Arc::new(Self { + queue: Mutex::new(VecDeque::new()), + condvar: CondVar::new(), + }) + } + + /// Wait while the queue is empty, remove and return the item from the queue, + /// panicking if empty (shouldn't happen) + pub(super) async fn recv(&self) -> T { + let mut queue = self.queue.lock().await; + + while queue.is_empty() { + queue = self.condvar.wait(queue).await; + } + + match queue.pop_front() { + Some(v) => v, + None => unreachable!(), + } + } + + /// Push an item into the queue, notify all waiting tasks that the + /// condvar has changed + pub(super) async fn push(&self, res: T) { + self.queue.lock().await.push_back(res); + self.condvar.signal(); + } +} -- cgit v1.2.3