From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/message_dispatcher.go | 64 +++++++++++++++++------------------- 1 file changed, 31 insertions(+), 33 deletions(-) (limited to 'jsonrpc/client/message_dispatcher.go') diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go index 9177f6e..adedd1c 100644 --- a/jsonrpc/client/message_dispatcher.go +++ b/jsonrpc/client/message_dispatcher.go @@ -1,76 +1,74 @@ package client import ( - "fmt" + "errors" "sync" + + "github.com/karyontech/karyon-go/jsonrpc/message" +) + +var ( + requestChannelNotFoundErr = errors.New("Request channel not found") ) -// messageDispatcher Is a generic structure that holds a map of keys and +// messageDispatcher Is a structure that holds a map of request IDs and // channels, and it is protected by mutex -type messageDispatcher[K comparable, V any] struct { +type messageDispatcher struct { sync.Mutex - chans map[K]chan<- V - bufferSize int + chans map[message.RequestID]chan<- message.Response } // newMessageDispatcher Creates a new messageDispatcher -func newMessageDispatcher[K comparable, V any](bufferSize int) *messageDispatcher[K, V] { - chans := make(map[K]chan<- V) - return &messageDispatcher[K, V]{ - chans: chans, - bufferSize: bufferSize, +func newMessageDispatcher() *messageDispatcher { + chans := make(map[message.RequestID]chan<- message.Response) + return &messageDispatcher{ + chans: chans, } } -// register Registers a new channel with a given key. It returns the receiving channel. -func (c *messageDispatcher[K, V]) register(key K) <-chan V { +// register Registers a new request channel with the given id. It returns a +// channel for receiving response. +func (c *messageDispatcher) register(key message.RequestID) <-chan message.Response { c.Lock() defer c.Unlock() - ch := make(chan V, c.bufferSize) + ch := make(chan message.Response) c.chans[key] = ch return ch } -// length Returns the number of channels -func (c *messageDispatcher[K, V]) length() int { +// dispatch Disptaches the response to the channel with the given request id +func (c *messageDispatcher) dispatch(key message.RequestID, res message.Response) error { c.Lock() defer c.Unlock() - return len(c.chans) -} - -// dispatch Disptaches the msg to the channel with the given key -func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error { - c.Lock() - ch, ok := c.chans[key] - c.Unlock() - - if !ok { - return fmt.Errorf("Channel not found") + if ch, ok := c.chans[key]; ok { + ch <- res + } else { + return requestChannelNotFoundErr } - ch <- msg return nil } -// unregister Unregisters the channel with the provided key -func (c *messageDispatcher[K, V]) unregister(key K) { +// unregister Unregisters the request with the provided id +func (c *messageDispatcher) unregister(key message.RequestID) { c.Lock() defer c.Unlock() + if ch, ok := c.chans[key]; ok { close(ch) delete(c.chans, key) } } -// clear Closes all the channels and remove them from the map -func (c *messageDispatcher[K, V]) clear() { +// clear Closes all the request channels and remove them from the map +func (c *messageDispatcher) clear() { c.Lock() defer c.Unlock() - for k, ch := range c.chans { + for _, ch := range c.chans { close(ch) - delete(c.chans, k) } + c.chans = nil } -- cgit v1.2.3