diff options
Diffstat (limited to 'jsonrpc/client/subscriptions.go')
-rw-r--r-- | jsonrpc/client/subscriptions.go | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/jsonrpc/client/subscriptions.go b/jsonrpc/client/subscriptions.go new file mode 100644 index 0000000..0524837 --- /dev/null +++ b/jsonrpc/client/subscriptions.go @@ -0,0 +1,81 @@ +package client + +import ( + "encoding/json" + "errors" + "sync" + + "github.com/karyontech/karyon-go/jsonrpc/message" +) + +var ( + subscriptionNotFoundErr = errors.New("Subscription not found") +) + +// subscriptions Is a structure that holds a map of subscription IDs and +// subscriptions +type subscriptions struct { + sync.Mutex + subs map[message.SubscriptionID]*Subscription + bufferSize int +} + +// newSubscriptions Creates a new subscriptions +func newSubscriptions(bufferSize int) *subscriptions { + subs := make(map[message.SubscriptionID]*Subscription) + return &subscriptions{ + subs: subs, + bufferSize: bufferSize, + } +} + +// subscribe Subscribes and returns a Subscription. +func (c *subscriptions) subscribe(key message.SubscriptionID) *Subscription { + c.Lock() + defer c.Unlock() + + sub := newSubscription(key, c.bufferSize) + c.subs[key] = sub + return sub +} + +// notify Notifies the msg the subscription with the given id +func (c *subscriptions) notify(key message.SubscriptionID, msg json.RawMessage) error { + c.Lock() + defer c.Unlock() + + sub, ok := c.subs[key] + + if !ok { + return subscriptionNotFoundErr + } + + err := sub.notify(msg) + + if err != nil { + return err + } + + return nil +} + +// unsubscribe Unsubscribe from the subscription with the provided id +func (c *subscriptions) unsubscribe(key message.SubscriptionID) { + c.Lock() + defer c.Unlock() + if sub, ok := c.subs[key]; ok { + sub.stop() + delete(c.subs, key) + } +} + +// clear Stops all the subscriptions and remove them from the map +func (c *subscriptions) clear() { + c.Lock() + defer c.Unlock() + + for _, sub := range c.subs { + sub.stop() + } + c.subs = nil +} |