From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/subscriptions_test.go | 87 ++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 jsonrpc/client/subscriptions_test.go (limited to 'jsonrpc/client/subscriptions_test.go') diff --git a/jsonrpc/client/subscriptions_test.go b/jsonrpc/client/subscriptions_test.go new file mode 100644 index 0000000..cb5705a --- /dev/null +++ b/jsonrpc/client/subscriptions_test.go @@ -0,0 +1,87 @@ +package client + +import ( + "encoding/json" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSubscriptionsSubscribe(t *testing.T) { + bufSize := 100 + subs := newSubscriptions(bufSize) + + var receivedNotifications atomic.Int32 + + var wg sync.WaitGroup + + runSubNotify := func(sub *Subscription) { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < bufSize; i++ { + b, err := json.Marshal(i) + assert.Nil(t, err) + err = sub.notify(b) + assert.Nil(t, err) + } + }() + } + + runSubRecv := func(sub *Subscription) { + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for nt := range sub.Recv() { + var v int + err := json.Unmarshal(nt, &v) + assert.Nil(t, err) + assert.Equal(t, v, i) + receivedNotifications.Add(1) + i += 1 + if i == bufSize { + break + } + } + sub.stop() + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + sub := subs.subscribe(i) + runSubNotify(sub) + runSubRecv(sub) + } + }() + + wg.Wait() + assert.Equal(t, receivedNotifications.Load(), int32(bufSize*3)) +} + +func TestSubscriptionsUnsubscribe(t *testing.T) { + bufSize := 100 + subs := newSubscriptions(bufSize) + + var wg sync.WaitGroup + + sub := subs.subscribe(1) + subs.unsubscribe(1) + + _, ok := <-sub.Recv() + assert.False(t, ok) + + b, err := json.Marshal(1) + assert.Nil(t, err) + err = sub.notify(b) + if assert.Error(t, err) { + assert.ErrorIs(t, err, subscriptionIsClosedErr) + } + + wg.Wait() +} -- cgit v1.2.3