aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/subscription.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-23 15:57:43 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-09 11:46:03 +0200
commit6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch)
tree3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/subscription.go
parent223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff)
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/subscription.go')
-rw-r--r--jsonrpc/client/subscription.go83
1 files changed, 83 insertions, 0 deletions
diff --git a/jsonrpc/client/subscription.go b/jsonrpc/client/subscription.go
new file mode 100644
index 0000000..a9a94e7
--- /dev/null
+++ b/jsonrpc/client/subscription.go
@@ -0,0 +1,83 @@
+package client
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "sync/atomic"
+
+ log "github.com/sirupsen/logrus"
+)
+
+var (
+ subscriptionIsClosedErr = errors.New("Subscription is closed")
+)
+
+// Subscription A subscription established when the client's subscribe to a method
+type Subscription struct {
+ ch chan json.RawMessage
+ ID int
+ queue *queue[json.RawMessage]
+ stopSignal chan struct{}
+ isClosed atomic.Bool
+}
+
+// newSubscription Creates a new Subscription
+func newSubscription(subID int, bufferSize int) *Subscription {
+ sub := &Subscription{
+ ch: make(chan json.RawMessage),
+ ID: subID,
+ queue: newQueue[json.RawMessage](bufferSize),
+ stopSignal: make(chan struct{}),
+ }
+ sub.startBackgroundJob()
+
+ return sub
+}
+
+// Recv Receives a new notification.
+func (s *Subscription) Recv() <-chan json.RawMessage {
+ return s.ch
+}
+
+// startBackgroundJob starts waiting for the queue to receive new items.
+// It stops when it receives a stop signal.
+func (s *Subscription) startBackgroundJob() {
+ go func() {
+ logger := log.WithField("Subscription", s.ID)
+ for {
+ msg, ok := s.queue.pop()
+ if !ok {
+ logger.Debug("Background job stopped")
+ return
+ }
+ select {
+ case <-s.stopSignal:
+ logger.Debug("Background job stopped: %w", receivedStopSignalErr)
+ return
+ case s.ch <- msg:
+ }
+ }
+ }()
+}
+
+// notify adds a new notification to the queue.
+func (s *Subscription) notify(nt json.RawMessage) error {
+ if s.isClosed.Load() {
+ return subscriptionIsClosedErr
+ }
+ if err := s.queue.push(nt); err != nil {
+ return fmt.Errorf("Unable to push new notification: %w", err)
+ }
+ return nil
+}
+
+// stop Terminates the subscription, clears the queue, and closes channels.
+func (s *Subscription) stop() {
+ if !s.isClosed.CompareAndSwap(false, true) {
+ return
+ }
+ close(s.stopSignal)
+ close(s.ch)
+ s.queue.clear()
+}