From 8c2d37e093ca64d591fc0aec15a7e2ed424b2e47 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 13 Jun 2024 06:02:24 +0200 Subject: use message dispatcher to process responses and notifications & spread out comments --- jsonrpc/client/message_dispatcher.go | 75 ++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 jsonrpc/client/message_dispatcher.go (limited to 'jsonrpc/client/message_dispatcher.go') diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go new file mode 100644 index 0000000..6484484 --- /dev/null +++ b/jsonrpc/client/message_dispatcher.go @@ -0,0 +1,75 @@ +package client + +import ( + "fmt" + "sync" +) + +// messageDispatcher Is a generic structure that holds a map of keys and +// channels, and it is protected by mutex +type messageDispatcher[K comparable, V any] struct { + sync.Mutex + chans map[K]chan<- V + bufferSize int +} + +// newMessageDispatcher Creates a new messageDispatcher +func newMessageDispatcher[K comparable, V any](bufferSize int) messageDispatcher[K, V] { + chans := make(map[K]chan<- V) + return messageDispatcher[K, V]{ + chans: chans, + bufferSize: bufferSize, + } +} + +// register Registers a new channel with a given key. It returns the receiving channel. +func (c *messageDispatcher[K, V]) register(key K) <-chan V { + c.Lock() + defer c.Unlock() + + ch := make(chan V, c.bufferSize) + c.chans[key] = ch + return ch +} + +// length Returns the number of channels +func (c *messageDispatcher[K, V]) length() int { + c.Lock() + defer c.Unlock() + + return len(c.chans) +} + +// disptach Disptaches the msg to the channel with the given key +func (c *messageDispatcher[K, V]) disptach(key K, msg V) error { + c.Lock() + defer c.Unlock() + + if ch, ok := c.chans[key]; ok { + ch <- msg + return nil + } + + return fmt.Errorf("Channel not found") +} + +// unregister Unregisters the channel with the provided key +func (c *messageDispatcher[K, V]) unregister(key K) { + c.Lock() + defer c.Unlock() + if ch, ok := c.chans[key]; ok { + close(ch) + delete(c.chans, key) + } +} + +// clear Closes all the channels and remove them from the map +func (c *messageDispatcher[K, V]) clear() { + c.Lock() + defer c.Unlock() + + for k, ch := range c.chans { + close(ch) + delete(c.chans, k) + } +} -- cgit v1.2.3