aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/concurrent_queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/client/concurrent_queue.go')
-rw-r--r--jsonrpc/client/concurrent_queue.go88
1 files changed, 88 insertions, 0 deletions
diff --git a/jsonrpc/client/concurrent_queue.go b/jsonrpc/client/concurrent_queue.go
new file mode 100644
index 0000000..daf92cd
--- /dev/null
+++ b/jsonrpc/client/concurrent_queue.go
@@ -0,0 +1,88 @@
+package client
+
+import (
+ "errors"
+ "sync"
+ "sync/atomic"
+)
+
+// queue A concurrent queue.
+type queue[T any] struct {
+ lock sync.Mutex
+ cond *sync.Cond
+ items []T
+ bufferSize int
+ stopSignal chan struct{}
+ isClosed atomic.Bool
+}
+
+var (
+ queueIsFullErr = errors.New("Queue is full")
+ queueIsClosedErr = errors.New("Queue is closed")
+)
+
+// newQueue creates a new queue with the specified buffer size.
+func newQueue[T any](bufferSize int) *queue[T] {
+ q := &queue[T]{
+ bufferSize: bufferSize,
+ items: make([]T, 0),
+ stopSignal: make(chan struct{}),
+ }
+ q.cond = sync.NewCond(&q.lock)
+ return q
+}
+
+// push Adds a new item to the queue and returns an error if the queue
+// is full.
+func (q *queue[T]) push(item T) error {
+ if q.isClosed.Load() {
+ return queueIsClosedErr
+ }
+
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if len(q.items) >= q.bufferSize {
+ return queueIsFullErr
+ }
+
+ q.items = append(q.items, item)
+ q.cond.Signal()
+ return nil
+}
+
+// pop waits for and removes the first element from the queue, then returns it.
+func (q *queue[T]) pop() (T, bool) {
+ var t T
+ if q.isClosed.Load() {
+ return t, false
+ }
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Wait for an item to be available or for a stop signal.
+ // This ensures that the waiting stops once the queue is cleared.
+ for len(q.items) == 0 {
+ select {
+ case <-q.stopSignal:
+ return t, false
+ default:
+ q.cond.Wait()
+ }
+ }
+
+ item := q.items[0]
+ q.items = q.items[1:]
+ return item, true
+}
+
+// clear Clears all elements from the queue.
+func (q *queue[T]) clear() {
+ if !q.isClosed.CompareAndSwap(false, true) {
+ return
+ }
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ close(q.stopSignal)
+ q.items = nil
+ q.cond.Broadcast()
+}