From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/subscriptions.go | 81 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 jsonrpc/client/subscriptions.go (limited to 'jsonrpc/client/subscriptions.go') diff --git a/jsonrpc/client/subscriptions.go b/jsonrpc/client/subscriptions.go new file mode 100644 index 0000000..0524837 --- /dev/null +++ b/jsonrpc/client/subscriptions.go @@ -0,0 +1,81 @@ +package client + +import ( + "encoding/json" + "errors" + "sync" + + "github.com/karyontech/karyon-go/jsonrpc/message" +) + +var ( + subscriptionNotFoundErr = errors.New("Subscription not found") +) + +// subscriptions Is a structure that holds a map of subscription IDs and +// subscriptions +type subscriptions struct { + sync.Mutex + subs map[message.SubscriptionID]*Subscription + bufferSize int +} + +// newSubscriptions Creates a new subscriptions +func newSubscriptions(bufferSize int) *subscriptions { + subs := make(map[message.SubscriptionID]*Subscription) + return &subscriptions{ + subs: subs, + bufferSize: bufferSize, + } +} + +// subscribe Subscribes and returns a Subscription. +func (c *subscriptions) subscribe(key message.SubscriptionID) *Subscription { + c.Lock() + defer c.Unlock() + + sub := newSubscription(key, c.bufferSize) + c.subs[key] = sub + return sub +} + +// notify Notifies the msg the subscription with the given id +func (c *subscriptions) notify(key message.SubscriptionID, msg json.RawMessage) error { + c.Lock() + defer c.Unlock() + + sub, ok := c.subs[key] + + if !ok { + return subscriptionNotFoundErr + } + + err := sub.notify(msg) + + if err != nil { + return err + } + + return nil +} + +// unsubscribe Unsubscribe from the subscription with the provided id +func (c *subscriptions) unsubscribe(key message.SubscriptionID) { + c.Lock() + defer c.Unlock() + if sub, ok := c.subs[key]; ok { + sub.stop() + delete(c.subs, key) + } +} + +// clear Stops all the subscriptions and remove them from the map +func (c *subscriptions) clear() { + c.Lock() + defer c.Unlock() + + for _, sub := range c.subs { + sub.stop() + } + c.subs = nil +} -- cgit v1.2.3