diff options
Diffstat (limited to 'jsonrpc/client/subscription.go')
-rw-r--r-- | jsonrpc/client/subscription.go | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/jsonrpc/client/subscription.go b/jsonrpc/client/subscription.go new file mode 100644 index 0000000..a9a94e7 --- /dev/null +++ b/jsonrpc/client/subscription.go @@ -0,0 +1,83 @@ +package client + +import ( + "encoding/json" + "errors" + "fmt" + "sync/atomic" + + log "github.com/sirupsen/logrus" +) + +var ( + subscriptionIsClosedErr = errors.New("Subscription is closed") +) + +// Subscription A subscription established when the client's subscribe to a method +type Subscription struct { + ch chan json.RawMessage + ID int + queue *queue[json.RawMessage] + stopSignal chan struct{} + isClosed atomic.Bool +} + +// newSubscription Creates a new Subscription +func newSubscription(subID int, bufferSize int) *Subscription { + sub := &Subscription{ + ch: make(chan json.RawMessage), + ID: subID, + queue: newQueue[json.RawMessage](bufferSize), + stopSignal: make(chan struct{}), + } + sub.startBackgroundJob() + + return sub +} + +// Recv Receives a new notification. +func (s *Subscription) Recv() <-chan json.RawMessage { + return s.ch +} + +// startBackgroundJob starts waiting for the queue to receive new items. +// It stops when it receives a stop signal. +func (s *Subscription) startBackgroundJob() { + go func() { + logger := log.WithField("Subscription", s.ID) + for { + msg, ok := s.queue.pop() + if !ok { + logger.Debug("Background job stopped") + return + } + select { + case <-s.stopSignal: + logger.Debug("Background job stopped: %w", receivedStopSignalErr) + return + case s.ch <- msg: + } + } + }() +} + +// notify adds a new notification to the queue. +func (s *Subscription) notify(nt json.RawMessage) error { + if s.isClosed.Load() { + return subscriptionIsClosedErr + } + if err := s.queue.push(nt); err != nil { + return fmt.Errorf("Unable to push new notification: %w", err) + } + return nil +} + +// stop Terminates the subscription, clears the queue, and closes channels. +func (s *Subscription) stop() { + if !s.isClosed.CompareAndSwap(false, true) { + return + } + close(s.stopSignal) + close(s.ch) + s.queue.clear() +} |