aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/message_dispatcher.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-13 06:02:24 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-13 06:02:24 +0200
commit8c2d37e093ca64d591fc0aec15a7e2ed424b2e47 (patch)
treefd9bc62e29087a18e7eb4bdd0a1f587ba63e5dd5 /jsonrpc/client/message_dispatcher.go
parenta338905a7f8a2206161cc15f07bda872b9bfc09c (diff)
use message dispatcher to process responses and notifications & spread out comments
Diffstat (limited to 'jsonrpc/client/message_dispatcher.go')
-rw-r--r--jsonrpc/client/message_dispatcher.go75
1 files changed, 75 insertions, 0 deletions
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go
new file mode 100644
index 0000000..6484484
--- /dev/null
+++ b/jsonrpc/client/message_dispatcher.go
@@ -0,0 +1,75 @@
+package client
+
+import (
+ "fmt"
+ "sync"
+)
+
+// messageDispatcher Is a generic structure that holds a map of keys and
+// channels, and it is protected by mutex
+type messageDispatcher[K comparable, V any] struct {
+ sync.Mutex
+ chans map[K]chan<- V
+ bufferSize int
+}
+
+// newMessageDispatcher Creates a new messageDispatcher
+func newMessageDispatcher[K comparable, V any](bufferSize int) messageDispatcher[K, V] {
+ chans := make(map[K]chan<- V)
+ return messageDispatcher[K, V]{
+ chans: chans,
+ bufferSize: bufferSize,
+ }
+}
+
+// register Registers a new channel with a given key. It returns the receiving channel.
+func (c *messageDispatcher[K, V]) register(key K) <-chan V {
+ c.Lock()
+ defer c.Unlock()
+
+ ch := make(chan V, c.bufferSize)
+ c.chans[key] = ch
+ return ch
+}
+
+// length Returns the number of channels
+func (c *messageDispatcher[K, V]) length() int {
+ c.Lock()
+ defer c.Unlock()
+
+ return len(c.chans)
+}
+
+// disptach Disptaches the msg to the channel with the given key
+func (c *messageDispatcher[K, V]) disptach(key K, msg V) error {
+ c.Lock()
+ defer c.Unlock()
+
+ if ch, ok := c.chans[key]; ok {
+ ch <- msg
+ return nil
+ }
+
+ return fmt.Errorf("Channel not found")
+}
+
+// unregister Unregisters the channel with the provided key
+func (c *messageDispatcher[K, V]) unregister(key K) {
+ c.Lock()
+ defer c.Unlock()
+ if ch, ok := c.chans[key]; ok {
+ close(ch)
+ delete(c.chans, key)
+ }
+}
+
+// clear Closes all the channels and remove them from the map
+func (c *messageDispatcher[K, V]) clear() {
+ c.Lock()
+ defer c.Unlock()
+
+ for k, ch := range c.chans {
+ close(ch)
+ delete(c.chans, k)
+ }
+}