From fa0b0efc14f84ff87789cabe0010f3240245407c Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 31 May 2024 02:17:56 +0200 Subject: init commit --- client/channels.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 client/channels.go (limited to 'client/channels.go') diff --git a/client/channels.go b/client/channels.go new file mode 100644 index 0000000..673e366 --- /dev/null +++ b/client/channels.go @@ -0,0 +1,78 @@ +package client + +import ( + "fmt" + "sync" +) + +// channels is a generic structure that holds a map of keys and channels. +// It is protected by mutex +type channels[K comparable, V any] struct { + sync.Mutex + chans map[K]chan<- V + bufferSize int +} + +// newChannels creates a new channels +func newChannels[K comparable, V any](bufferSize int) channels[K, V] { + chans := make(map[K]chan<- V) + return channels[K, V]{ + chans: chans, + bufferSize: bufferSize, + } +} + +// add adds a new channel and returns the receiving channel +func (c *channels[K, V]) add(key K) <-chan V { + c.Lock() + defer c.Unlock() + + ch := make(chan V, c.bufferSize) + c.chans[key] = ch + return ch +} + +// length returns the number of channels +func (c *channels[K, V]) length() int { + c.Lock() + defer c.Unlock() + + return len(c.chans) +} + +// notify notifies the channel with the given key +func (c *channels[K, V]) notify(key K, msg V) error { + c.Lock() + defer c.Unlock() + + if ch, ok := c.chans[key]; ok { + ch <- msg + return nil + } + + return fmt.Errorf("Channel not found") +} + +// remove removes and returns the channel. +func (c *channels[K, V]) remove(key K) chan<- V { + c.Lock() + defer c.Unlock() + + if ch, ok := c.chans[key]; ok { + delete(c.chans, key) + return ch + } + + return nil +} + +// clear close all the channels and remove them from the map +func (c *channels[K, V]) clear() { + c.Lock() + defer c.Unlock() + + for k, ch := range c.chans { + close(ch) + delete(c.chans, k) + } +} -- cgit v1.2.3