aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/concurrent_queue.go
blob: daf92cd863eb446e6854dd6f5518d6e75281e4c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package client

import (
	"errors"
	"sync"
	"sync/atomic"
)

// queue A concurrent queue.
type queue[T any] struct {
	lock       sync.Mutex
	cond       *sync.Cond
	items      []T
	bufferSize int
	stopSignal chan struct{}
	isClosed   atomic.Bool
}

var (
	queueIsFullErr   = errors.New("Queue is full")
	queueIsClosedErr = errors.New("Queue is closed")
)

// newQueue creates a new queue with the specified buffer size.
func newQueue[T any](bufferSize int) *queue[T] {
	q := &queue[T]{
		bufferSize: bufferSize,
		items:      make([]T, 0),
		stopSignal: make(chan struct{}),
	}
	q.cond = sync.NewCond(&q.lock)
	return q
}

// push Adds a new item to the queue and returns an error if the queue
// is full.
func (q *queue[T]) push(item T) error {
	if q.isClosed.Load() {
		return queueIsClosedErr
	}

	q.lock.Lock()
	defer q.lock.Unlock()
	if len(q.items) >= q.bufferSize {
		return queueIsFullErr
	}

	q.items = append(q.items, item)
	q.cond.Signal()
	return nil
}

// pop waits for and removes the first element from the queue, then returns it.
func (q *queue[T]) pop() (T, bool) {
	var t T
	if q.isClosed.Load() {
		return t, false
	}
	q.lock.Lock()
	defer q.lock.Unlock()

	// Wait for an item to be available or for a stop signal.
	// This ensures that the waiting stops once the queue is cleared.
	for len(q.items) == 0 {
		select {
		case <-q.stopSignal:
			return t, false
		default:
			q.cond.Wait()
		}
	}

	item := q.items[0]
	q.items = q.items[1:]
	return item, true
}

// clear Clears all elements from the queue.
func (q *queue[T]) clear() {
	if !q.isClosed.CompareAndSwap(false, true) {
		return
	}
	q.lock.Lock()
	defer q.lock.Unlock()
	close(q.stopSignal)
	q.items = nil
	q.cond.Broadcast()
}