From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/subscription.go | 83 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 jsonrpc/client/subscription.go (limited to 'jsonrpc/client/subscription.go') diff --git a/jsonrpc/client/subscription.go b/jsonrpc/client/subscription.go new file mode 100644 index 0000000..a9a94e7 --- /dev/null +++ b/jsonrpc/client/subscription.go @@ -0,0 +1,83 @@ +package client + +import ( + "encoding/json" + "errors" + "fmt" + "sync/atomic" + + log "github.com/sirupsen/logrus" +) + +var ( + subscriptionIsClosedErr = errors.New("Subscription is closed") +) + +// Subscription A subscription established when the client's subscribe to a method +type Subscription struct { + ch chan json.RawMessage + ID int + queue *queue[json.RawMessage] + stopSignal chan struct{} + isClosed atomic.Bool +} + +// newSubscription Creates a new Subscription +func newSubscription(subID int, bufferSize int) *Subscription { + sub := &Subscription{ + ch: make(chan json.RawMessage), + ID: subID, + queue: newQueue[json.RawMessage](bufferSize), + stopSignal: make(chan struct{}), + } + sub.startBackgroundJob() + + return sub +} + +// Recv Receives a new notification. +func (s *Subscription) Recv() <-chan json.RawMessage { + return s.ch +} + +// startBackgroundJob starts waiting for the queue to receive new items. +// It stops when it receives a stop signal. +func (s *Subscription) startBackgroundJob() { + go func() { + logger := log.WithField("Subscription", s.ID) + for { + msg, ok := s.queue.pop() + if !ok { + logger.Debug("Background job stopped") + return + } + select { + case <-s.stopSignal: + logger.Debug("Background job stopped: %w", receivedStopSignalErr) + return + case s.ch <- msg: + } + } + }() +} + +// notify adds a new notification to the queue. +func (s *Subscription) notify(nt json.RawMessage) error { + if s.isClosed.Load() { + return subscriptionIsClosedErr + } + if err := s.queue.push(nt); err != nil { + return fmt.Errorf("Unable to push new notification: %w", err) + } + return nil +} + +// stop Terminates the subscription, clears the queue, and closes channels. +func (s *Subscription) stop() { + if !s.isClosed.CompareAndSwap(false, true) { + return + } + close(s.stopSignal) + close(s.ch) + s.queue.clear() +} -- cgit v1.2.3