aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/subscription.go
blob: a9a94e7ccf0e2ddb349e7bace120e209fc9ca574 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package client

import (
	"encoding/json"
	"errors"
	"fmt"
	"sync/atomic"

	log "github.com/sirupsen/logrus"
)

var (
	subscriptionIsClosedErr = errors.New("Subscription is closed")
)

// Subscription A subscription established when the client's subscribe to a method
type Subscription struct {
	ch         chan json.RawMessage
	ID         int
	queue      *queue[json.RawMessage]
	stopSignal chan struct{}
	isClosed   atomic.Bool
}

// newSubscription Creates a new Subscription
func newSubscription(subID int, bufferSize int) *Subscription {
	sub := &Subscription{
		ch:         make(chan json.RawMessage),
		ID:         subID,
		queue:      newQueue[json.RawMessage](bufferSize),
		stopSignal: make(chan struct{}),
	}
	sub.startBackgroundJob()

	return sub
}

// Recv Receives a new notification.
func (s *Subscription) Recv() <-chan json.RawMessage {
	return s.ch
}

// startBackgroundJob starts waiting for the queue to receive new items.
// It stops when it receives a stop signal.
func (s *Subscription) startBackgroundJob() {
	go func() {
		logger := log.WithField("Subscription", s.ID)
		for {
			msg, ok := s.queue.pop()
			if !ok {
				logger.Debug("Background job stopped")
				return
			}
			select {
			case <-s.stopSignal:
				logger.Debug("Background job stopped: %w", receivedStopSignalErr)
				return
			case s.ch <- msg:
			}
		}
	}()
}

// notify adds a new notification to the queue.
func (s *Subscription) notify(nt json.RawMessage) error {
	if s.isClosed.Load() {
		return subscriptionIsClosedErr
	}
	if err := s.queue.push(nt); err != nil {
		return fmt.Errorf("Unable to push new notification: %w", err)
	}
	return nil
}

// stop Terminates the subscription, clears the queue, and closes channels.
func (s *Subscription) stop() {
	if !s.isClosed.CompareAndSwap(false, true) {
		return
	}
	close(s.stopSignal)
	close(s.ch)
	s.queue.clear()
}