diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-23 15:57:43 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-07-09 11:46:03 +0200 |
commit | 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch) | |
tree | 3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/subscription_test.go | |
parent | 223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff) |
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new
channel with the fixed buffer size. This caused blocking when the
channel buffer was full, preventing the client from handling additional messages.
Now, there is a `subscriptions` struct that holds a queue for receiving
notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/subscription_test.go')
-rw-r--r-- | jsonrpc/client/subscription_test.go | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/jsonrpc/client/subscription_test.go b/jsonrpc/client/subscription_test.go new file mode 100644 index 0000000..5928665 --- /dev/null +++ b/jsonrpc/client/subscription_test.go @@ -0,0 +1,86 @@ +package client + +import ( + "encoding/json" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSubscriptionFullQueue(t *testing.T) { + bufSize := 100 + sub := newSubscription(1, bufSize) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + defer sub.stop() + for i := 0; i < bufSize+10; i++ { + b, err := json.Marshal(i) + assert.Nil(t, err) + err = sub.notify(b) + if i > bufSize { + if assert.Error(t, err) { + assert.ErrorIs(t, err, queueIsFullErr) + } + } + } + }() + + wg.Wait() +} + +func TestSubscriptionRecv(t *testing.T) { + bufSize := 100 + sub := newSubscription(1, bufSize) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < bufSize; i++ { + b, err := json.Marshal(i) + assert.Nil(t, err) + err = sub.notify(b) + assert.Nil(t, err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for nt := range sub.Recv() { + var v int + err := json.Unmarshal(nt, &v) + assert.Nil(t, err) + assert.Equal(t, v, i) + i += 1 + if i == bufSize { + break + } + } + }() + + wg.Wait() +} + +func TestSubscriptionStop(t *testing.T) { + sub := newSubscription(1, 10) + + sub.stop() + + _, ok := <-sub.Recv() + assert.False(t, ok) + + b, err := json.Marshal(1) + assert.Nil(t, err) + err = sub.notify(b) + if assert.Error(t, err) { + assert.ErrorIs(t, err, subscriptionIsClosedErr) + } +} |