diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-23 15:57:43 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-07-09 11:46:03 +0200 |
commit | 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch) | |
tree | 3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/concurrent_queue.go | |
parent | 223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff) |
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new
channel with the fixed buffer size. This caused blocking when the
channel buffer was full, preventing the client from handling additional messages.
Now, there is a `subscriptions` struct that holds a queue for receiving
notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/concurrent_queue.go')
-rw-r--r-- | jsonrpc/client/concurrent_queue.go | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/jsonrpc/client/concurrent_queue.go b/jsonrpc/client/concurrent_queue.go new file mode 100644 index 0000000..daf92cd --- /dev/null +++ b/jsonrpc/client/concurrent_queue.go @@ -0,0 +1,88 @@ +package client + +import ( + "errors" + "sync" + "sync/atomic" +) + +// queue A concurrent queue. +type queue[T any] struct { + lock sync.Mutex + cond *sync.Cond + items []T + bufferSize int + stopSignal chan struct{} + isClosed atomic.Bool +} + +var ( + queueIsFullErr = errors.New("Queue is full") + queueIsClosedErr = errors.New("Queue is closed") +) + +// newQueue creates a new queue with the specified buffer size. +func newQueue[T any](bufferSize int) *queue[T] { + q := &queue[T]{ + bufferSize: bufferSize, + items: make([]T, 0), + stopSignal: make(chan struct{}), + } + q.cond = sync.NewCond(&q.lock) + return q +} + +// push Adds a new item to the queue and returns an error if the queue +// is full. +func (q *queue[T]) push(item T) error { + if q.isClosed.Load() { + return queueIsClosedErr + } + + q.lock.Lock() + defer q.lock.Unlock() + if len(q.items) >= q.bufferSize { + return queueIsFullErr + } + + q.items = append(q.items, item) + q.cond.Signal() + return nil +} + +// pop waits for and removes the first element from the queue, then returns it. +func (q *queue[T]) pop() (T, bool) { + var t T + if q.isClosed.Load() { + return t, false + } + q.lock.Lock() + defer q.lock.Unlock() + + // Wait for an item to be available or for a stop signal. + // This ensures that the waiting stops once the queue is cleared. + for len(q.items) == 0 { + select { + case <-q.stopSignal: + return t, false + default: + q.cond.Wait() + } + } + + item := q.items[0] + q.items = q.items[1:] + return item, true +} + +// clear Clears all elements from the queue. +func (q *queue[T]) clear() { + if !q.isClosed.CompareAndSwap(false, true) { + return + } + q.lock.Lock() + defer q.lock.Unlock() + close(q.stopSignal) + q.items = nil + q.cond.Broadcast() +} |