aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/subscription_test.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-23 15:57:43 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-09 11:46:03 +0200
commit6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch)
tree3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/subscription_test.go
parent223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff)
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/subscription_test.go')
-rw-r--r--jsonrpc/client/subscription_test.go86
1 files changed, 86 insertions, 0 deletions
diff --git a/jsonrpc/client/subscription_test.go b/jsonrpc/client/subscription_test.go
new file mode 100644
index 0000000..5928665
--- /dev/null
+++ b/jsonrpc/client/subscription_test.go
@@ -0,0 +1,86 @@
+package client
+
+import (
+ "encoding/json"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSubscriptionFullQueue(t *testing.T) {
+ bufSize := 100
+ sub := newSubscription(1, bufSize)
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer sub.stop()
+ for i := 0; i < bufSize+10; i++ {
+ b, err := json.Marshal(i)
+ assert.Nil(t, err)
+ err = sub.notify(b)
+ if i > bufSize {
+ if assert.Error(t, err) {
+ assert.ErrorIs(t, err, queueIsFullErr)
+ }
+ }
+ }
+ }()
+
+ wg.Wait()
+}
+
+func TestSubscriptionRecv(t *testing.T) {
+ bufSize := 100
+ sub := newSubscription(1, bufSize)
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < bufSize; i++ {
+ b, err := json.Marshal(i)
+ assert.Nil(t, err)
+ err = sub.notify(b)
+ assert.Nil(t, err)
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ i := 0
+ for nt := range sub.Recv() {
+ var v int
+ err := json.Unmarshal(nt, &v)
+ assert.Nil(t, err)
+ assert.Equal(t, v, i)
+ i += 1
+ if i == bufSize {
+ break
+ }
+ }
+ }()
+
+ wg.Wait()
+}
+
+func TestSubscriptionStop(t *testing.T) {
+ sub := newSubscription(1, 10)
+
+ sub.stop()
+
+ _, ok := <-sub.Recv()
+ assert.False(t, ok)
+
+ b, err := json.Marshal(1)
+ assert.Nil(t, err)
+ err = sub.notify(b)
+ if assert.Error(t, err) {
+ assert.ErrorIs(t, err, subscriptionIsClosedErr)
+ }
+}