aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/message_dispatcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/client/message_dispatcher.go')
-rw-r--r--jsonrpc/client/message_dispatcher.go64
1 files changed, 31 insertions, 33 deletions
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go
index 9177f6e..adedd1c 100644
--- a/jsonrpc/client/message_dispatcher.go
+++ b/jsonrpc/client/message_dispatcher.go
@@ -1,76 +1,74 @@
package client
import (
- "fmt"
+ "errors"
"sync"
+
+ "github.com/karyontech/karyon-go/jsonrpc/message"
+)
+
+var (
+ requestChannelNotFoundErr = errors.New("Request channel not found")
)
-// messageDispatcher Is a generic structure that holds a map of keys and
+// messageDispatcher Is a structure that holds a map of request IDs and
// channels, and it is protected by mutex
-type messageDispatcher[K comparable, V any] struct {
+type messageDispatcher struct {
sync.Mutex
- chans map[K]chan<- V
- bufferSize int
+ chans map[message.RequestID]chan<- message.Response
}
// newMessageDispatcher Creates a new messageDispatcher
-func newMessageDispatcher[K comparable, V any](bufferSize int) *messageDispatcher[K, V] {
- chans := make(map[K]chan<- V)
- return &messageDispatcher[K, V]{
- chans: chans,
- bufferSize: bufferSize,
+func newMessageDispatcher() *messageDispatcher {
+ chans := make(map[message.RequestID]chan<- message.Response)
+ return &messageDispatcher{
+ chans: chans,
}
}
-// register Registers a new channel with a given key. It returns the receiving channel.
-func (c *messageDispatcher[K, V]) register(key K) <-chan V {
+// register Registers a new request channel with the given id. It returns a
+// channel for receiving response.
+func (c *messageDispatcher) register(key message.RequestID) <-chan message.Response {
c.Lock()
defer c.Unlock()
- ch := make(chan V, c.bufferSize)
+ ch := make(chan message.Response)
c.chans[key] = ch
return ch
}
-// length Returns the number of channels
-func (c *messageDispatcher[K, V]) length() int {
+// dispatch Disptaches the response to the channel with the given request id
+func (c *messageDispatcher) dispatch(key message.RequestID, res message.Response) error {
c.Lock()
defer c.Unlock()
- return len(c.chans)
-}
-
-// dispatch Disptaches the msg to the channel with the given key
-func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error {
- c.Lock()
- ch, ok := c.chans[key]
- c.Unlock()
-
- if !ok {
- return fmt.Errorf("Channel not found")
+ if ch, ok := c.chans[key]; ok {
+ ch <- res
+ } else {
+ return requestChannelNotFoundErr
}
- ch <- msg
return nil
}
-// unregister Unregisters the channel with the provided key
-func (c *messageDispatcher[K, V]) unregister(key K) {
+// unregister Unregisters the request with the provided id
+func (c *messageDispatcher) unregister(key message.RequestID) {
c.Lock()
defer c.Unlock()
+
if ch, ok := c.chans[key]; ok {
close(ch)
delete(c.chans, key)
}
}
-// clear Closes all the channels and remove them from the map
-func (c *messageDispatcher[K, V]) clear() {
+// clear Closes all the request channels and remove them from the map
+func (c *messageDispatcher) clear() {
c.Lock()
defer c.Unlock()
- for k, ch := range c.chans {
+ for _, ch := range c.chans {
close(ch)
- delete(c.chans, k)
}
+ c.chans = nil
}