diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-06-23 15:57:43 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-07-09 11:46:03 +0200 | 
| commit | 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch) | |
| tree | 3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/message_dispatcher_test.go | |
| parent | 223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff) | |
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new
channel with the fixed buffer size. This caused blocking when the
channel buffer was full, preventing the client from handling additional messages.
Now, there is a `subscriptions` struct that holds a queue for receiving
notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/message_dispatcher_test.go')
| -rw-r--r-- | jsonrpc/client/message_dispatcher_test.go | 49 | 
1 files changed, 25 insertions, 24 deletions
diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go index a1fc1c6..4d7cc60 100644 --- a/jsonrpc/client/message_dispatcher_test.go +++ b/jsonrpc/client/message_dispatcher_test.go @@ -5,58 +5,61 @@ import (  	"sync/atomic"  	"testing" +	"github.com/karyontech/karyon-go/jsonrpc/message"  	"github.com/stretchr/testify/assert"  )  func TestDispatchToChannel(t *testing.T) { -	messageDispatcher := newMessageDispatcher[int, int](10) +	messageDispatcher := newMessageDispatcher() -	chanKey := 1 -	rx := messageDispatcher.register(chanKey) +	req1 := "1" +	rx := messageDispatcher.register(req1) -	chanKey2 := 2 -	rx2 := messageDispatcher.register(chanKey2) +	req2 := "2" +	rx2 := messageDispatcher.register(req2)  	var wg sync.WaitGroup  	wg.Add(1)  	go func() { +		defer wg.Done()  		for i := 0; i < 50; i++ { -			err := messageDispatcher.dispatch(chanKey, i) +			res := message.Response{ID: &req1} +			err := messageDispatcher.dispatch(req1, res)  			assert.Nil(t, err)  		} -		messageDispatcher.unregister(chanKey) -		wg.Done() +		messageDispatcher.unregister(req1)  	}()  	wg.Add(1)  	go func() { +		defer wg.Done()  		for i := 0; i < 50; i++ { -			err := messageDispatcher.dispatch(chanKey2, i) +			res := message.Response{ID: &req2} +			err := messageDispatcher.dispatch(req2, res)  			assert.Nil(t, err)  		} -		messageDispatcher.unregister(chanKey2) -		wg.Done() +		messageDispatcher.unregister(req2)  	}()  	var receivedItem atomic.Int32  	wg.Add(1)  	go func() { +		defer wg.Done()  		for range rx {  			receivedItem.Add(1)  		} -		wg.Done()  	}()  	wg.Add(1)  	go func() { +		defer wg.Done()  		for range rx2 {  			receivedItem.Add(1)  		} -		wg.Done()  	}()  	wg.Wait() @@ -64,33 +67,31 @@ func TestDispatchToChannel(t *testing.T) {  }  func TestUnregisterChannel(t *testing.T) { -	messageDispatcher := newMessageDispatcher[int, int](1) +	messageDispatcher := newMessageDispatcher() -	chanKey := 1 -	rx := messageDispatcher.register(chanKey) +	req := "1" +	rx := messageDispatcher.register(req) -	messageDispatcher.unregister(chanKey) -	assert.Equal(t, messageDispatcher.length(), 0, "channels should be empty") +	messageDispatcher.unregister(req)  	_, ok := <-rx  	assert.False(t, ok, "chan closed") -	err := messageDispatcher.dispatch(chanKey, 1) +	err := messageDispatcher.dispatch(req, message.Response{ID: &req})  	assert.NotNil(t, err)  }  func TestClearChannels(t *testing.T) { -	messageDispatcher := newMessageDispatcher[int, int](1) +	messageDispatcher := newMessageDispatcher() -	chanKey := 1 -	rx := messageDispatcher.register(chanKey) +	req := "1" +	rx := messageDispatcher.register(req)  	messageDispatcher.clear() -	assert.Equal(t, messageDispatcher.length(), 0, "channels should be empty")  	_, ok := <-rx  	assert.False(t, ok, "chan closed") -	err := messageDispatcher.dispatch(chanKey, 1) +	err := messageDispatcher.dispatch(req, message.Response{ID: &req})  	assert.NotNil(t, err)  }  | 
