aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/subscriptions.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-23 15:57:43 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-09 11:46:03 +0200
commit6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch)
tree3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/subscriptions.go
parent223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff)
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/subscriptions.go')
-rw-r--r--jsonrpc/client/subscriptions.go81
1 files changed, 81 insertions, 0 deletions
diff --git a/jsonrpc/client/subscriptions.go b/jsonrpc/client/subscriptions.go
new file mode 100644
index 0000000..0524837
--- /dev/null
+++ b/jsonrpc/client/subscriptions.go
@@ -0,0 +1,81 @@
+package client
+
+import (
+ "encoding/json"
+ "errors"
+ "sync"
+
+ "github.com/karyontech/karyon-go/jsonrpc/message"
+)
+
+var (
+ subscriptionNotFoundErr = errors.New("Subscription not found")
+)
+
+// subscriptions Is a structure that holds a map of subscription IDs and
+// subscriptions
+type subscriptions struct {
+ sync.Mutex
+ subs map[message.SubscriptionID]*Subscription
+ bufferSize int
+}
+
+// newSubscriptions Creates a new subscriptions
+func newSubscriptions(bufferSize int) *subscriptions {
+ subs := make(map[message.SubscriptionID]*Subscription)
+ return &subscriptions{
+ subs: subs,
+ bufferSize: bufferSize,
+ }
+}
+
+// subscribe Subscribes and returns a Subscription.
+func (c *subscriptions) subscribe(key message.SubscriptionID) *Subscription {
+ c.Lock()
+ defer c.Unlock()
+
+ sub := newSubscription(key, c.bufferSize)
+ c.subs[key] = sub
+ return sub
+}
+
+// notify Notifies the msg the subscription with the given id
+func (c *subscriptions) notify(key message.SubscriptionID, msg json.RawMessage) error {
+ c.Lock()
+ defer c.Unlock()
+
+ sub, ok := c.subs[key]
+
+ if !ok {
+ return subscriptionNotFoundErr
+ }
+
+ err := sub.notify(msg)
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// unsubscribe Unsubscribe from the subscription with the provided id
+func (c *subscriptions) unsubscribe(key message.SubscriptionID) {
+ c.Lock()
+ defer c.Unlock()
+ if sub, ok := c.subs[key]; ok {
+ sub.stop()
+ delete(c.subs, key)
+ }
+}
+
+// clear Stops all the subscriptions and remove them from the map
+func (c *subscriptions) clear() {
+ c.Lock()
+ defer c.Unlock()
+
+ for _, sub := range c.subs {
+ sub.stop()
+ }
+ c.subs = nil
+}