blob: a9a94e7ccf0e2ddb349e7bace120e209fc9ca574 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package client
import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
log "github.com/sirupsen/logrus"
)
var (
subscriptionIsClosedErr = errors.New("Subscription is closed")
)
// Subscription A subscription established when the client's subscribe to a method
type Subscription struct {
ch chan json.RawMessage
ID int
queue *queue[json.RawMessage]
stopSignal chan struct{}
isClosed atomic.Bool
}
// newSubscription Creates a new Subscription
func newSubscription(subID int, bufferSize int) *Subscription {
sub := &Subscription{
ch: make(chan json.RawMessage),
ID: subID,
queue: newQueue[json.RawMessage](bufferSize),
stopSignal: make(chan struct{}),
}
sub.startBackgroundJob()
return sub
}
// Recv Receives a new notification.
func (s *Subscription) Recv() <-chan json.RawMessage {
return s.ch
}
// startBackgroundJob starts waiting for the queue to receive new items.
// It stops when it receives a stop signal.
func (s *Subscription) startBackgroundJob() {
go func() {
logger := log.WithField("Subscription", s.ID)
for {
msg, ok := s.queue.pop()
if !ok {
logger.Debug("Background job stopped")
return
}
select {
case <-s.stopSignal:
logger.Debug("Background job stopped: %w", receivedStopSignalErr)
return
case s.ch <- msg:
}
}
}()
}
// notify adds a new notification to the queue.
func (s *Subscription) notify(nt json.RawMessage) error {
if s.isClosed.Load() {
return subscriptionIsClosedErr
}
if err := s.queue.push(nt); err != nil {
return fmt.Errorf("Unable to push new notification: %w", err)
}
return nil
}
// stop Terminates the subscription, clears the queue, and closes channels.
func (s *Subscription) stop() {
if !s.isClosed.CompareAndSwap(false, true) {
return
}
close(s.stopSignal)
close(s.ch)
s.queue.clear()
}
|