diff options
Diffstat (limited to 'jsonrpc/client/message_dispatcher.go')
-rw-r--r-- | jsonrpc/client/message_dispatcher.go | 64 |
1 files changed, 31 insertions, 33 deletions
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go index 9177f6e..adedd1c 100644 --- a/jsonrpc/client/message_dispatcher.go +++ b/jsonrpc/client/message_dispatcher.go @@ -1,76 +1,74 @@ package client import ( - "fmt" + "errors" "sync" + + "github.com/karyontech/karyon-go/jsonrpc/message" +) + +var ( + requestChannelNotFoundErr = errors.New("Request channel not found") ) -// messageDispatcher Is a generic structure that holds a map of keys and +// messageDispatcher Is a structure that holds a map of request IDs and // channels, and it is protected by mutex -type messageDispatcher[K comparable, V any] struct { +type messageDispatcher struct { sync.Mutex - chans map[K]chan<- V - bufferSize int + chans map[message.RequestID]chan<- message.Response } // newMessageDispatcher Creates a new messageDispatcher -func newMessageDispatcher[K comparable, V any](bufferSize int) *messageDispatcher[K, V] { - chans := make(map[K]chan<- V) - return &messageDispatcher[K, V]{ - chans: chans, - bufferSize: bufferSize, +func newMessageDispatcher() *messageDispatcher { + chans := make(map[message.RequestID]chan<- message.Response) + return &messageDispatcher{ + chans: chans, } } -// register Registers a new channel with a given key. It returns the receiving channel. -func (c *messageDispatcher[K, V]) register(key K) <-chan V { +// register Registers a new request channel with the given id. It returns a +// channel for receiving response. +func (c *messageDispatcher) register(key message.RequestID) <-chan message.Response { c.Lock() defer c.Unlock() - ch := make(chan V, c.bufferSize) + ch := make(chan message.Response) c.chans[key] = ch return ch } -// length Returns the number of channels -func (c *messageDispatcher[K, V]) length() int { +// dispatch Disptaches the response to the channel with the given request id +func (c *messageDispatcher) dispatch(key message.RequestID, res message.Response) error { c.Lock() defer c.Unlock() - return len(c.chans) -} - -// dispatch Disptaches the msg to the channel with the given key -func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error { - c.Lock() - ch, ok := c.chans[key] - c.Unlock() - - if !ok { - return fmt.Errorf("Channel not found") + if ch, ok := c.chans[key]; ok { + ch <- res + } else { + return requestChannelNotFoundErr } - ch <- msg return nil } -// unregister Unregisters the channel with the provided key -func (c *messageDispatcher[K, V]) unregister(key K) { +// unregister Unregisters the request with the provided id +func (c *messageDispatcher) unregister(key message.RequestID) { c.Lock() defer c.Unlock() + if ch, ok := c.chans[key]; ok { close(ch) delete(c.chans, key) } } -// clear Closes all the channels and remove them from the map -func (c *messageDispatcher[K, V]) clear() { +// clear Closes all the request channels and remove them from the map +func (c *messageDispatcher) clear() { c.Lock() defer c.Unlock() - for k, ch := range c.chans { + for _, ch := range c.chans { close(ch) - delete(c.chans, k) } + c.chans = nil } |