From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/message_dispatcher_test.go | 49 ++++++++++++++++--------------- 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'jsonrpc/client/message_dispatcher_test.go') diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go index a1fc1c6..4d7cc60 100644 --- a/jsonrpc/client/message_dispatcher_test.go +++ b/jsonrpc/client/message_dispatcher_test.go @@ -5,58 +5,61 @@ import ( "sync/atomic" "testing" + "github.com/karyontech/karyon-go/jsonrpc/message" "github.com/stretchr/testify/assert" ) func TestDispatchToChannel(t *testing.T) { - messageDispatcher := newMessageDispatcher[int, int](10) + messageDispatcher := newMessageDispatcher() - chanKey := 1 - rx := messageDispatcher.register(chanKey) + req1 := "1" + rx := messageDispatcher.register(req1) - chanKey2 := 2 - rx2 := messageDispatcher.register(chanKey2) + req2 := "2" + rx2 := messageDispatcher.register(req2) var wg sync.WaitGroup wg.Add(1) go func() { + defer wg.Done() for i := 0; i < 50; i++ { - err := messageDispatcher.dispatch(chanKey, i) + res := message.Response{ID: &req1} + err := messageDispatcher.dispatch(req1, res) assert.Nil(t, err) } - messageDispatcher.unregister(chanKey) - wg.Done() + messageDispatcher.unregister(req1) }() wg.Add(1) go func() { + defer wg.Done() for i := 0; i < 50; i++ { - err := messageDispatcher.dispatch(chanKey2, i) + res := message.Response{ID: &req2} + err := messageDispatcher.dispatch(req2, res) assert.Nil(t, err) } - messageDispatcher.unregister(chanKey2) - wg.Done() + messageDispatcher.unregister(req2) }() var receivedItem atomic.Int32 wg.Add(1) go func() { + defer wg.Done() for range rx { receivedItem.Add(1) } - wg.Done() }() wg.Add(1) go func() { + defer wg.Done() for range rx2 { receivedItem.Add(1) } - wg.Done() }() wg.Wait() @@ -64,33 +67,31 @@ func TestDispatchToChannel(t *testing.T) { } func TestUnregisterChannel(t *testing.T) { - messageDispatcher := newMessageDispatcher[int, int](1) + messageDispatcher := newMessageDispatcher() - chanKey := 1 - rx := messageDispatcher.register(chanKey) + req := "1" + rx := messageDispatcher.register(req) - messageDispatcher.unregister(chanKey) - assert.Equal(t, messageDispatcher.length(), 0, "channels should be empty") + messageDispatcher.unregister(req) _, ok := <-rx assert.False(t, ok, "chan closed") - err := messageDispatcher.dispatch(chanKey, 1) + err := messageDispatcher.dispatch(req, message.Response{ID: &req}) assert.NotNil(t, err) } func TestClearChannels(t *testing.T) { - messageDispatcher := newMessageDispatcher[int, int](1) + messageDispatcher := newMessageDispatcher() - chanKey := 1 - rx := messageDispatcher.register(chanKey) + req := "1" + rx := messageDispatcher.register(req) messageDispatcher.clear() - assert.Equal(t, messageDispatcher.length(), 0, "channels should be empty") _, ok := <-rx assert.False(t, ok, "chan closed") - err := messageDispatcher.dispatch(chanKey, 1) + err := messageDispatcher.dispatch(req, message.Response{ID: &req}) assert.NotNil(t, err) } -- cgit v1.2.3