diff options
Diffstat (limited to 'jsonrpc/client')
-rw-r--r-- | jsonrpc/client/client.go | 56 | ||||
-rw-r--r-- | jsonrpc/client/message_dispatcher.go | 15 | ||||
-rw-r--r-- | jsonrpc/client/message_dispatcher_test.go | 13 |
3 files changed, 43 insertions, 41 deletions
diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go index 67ed307..afcc5c7 100644 --- a/jsonrpc/client/client.go +++ b/jsonrpc/client/client.go @@ -30,11 +30,11 @@ type RPCClientConfig struct { // RPCClient RPC Client type RPCClient struct { - config RPCClientConfig - conn *websocket.Conn - requests *messageDispatcher[message.RequestID, message.Response] - subscriber *messageDispatcher[message.SubscriptionID, json.RawMessage] - stop_signal chan struct{} + config RPCClientConfig + conn *websocket.Conn + requests *messageDispatcher[message.RequestID, message.Response] + subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage] + stop_signal chan struct{} } // NewRPCClient Creates a new instance of RPCClient with the provided configuration. @@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { config.Timeout = DefaultTimeout } - stop_signal := make(chan struct{}, 2) + stop_signal := make(chan struct{}) + + requests := newMessageDispatcher[message.RequestID, message.Response](0) + subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100) client := &RPCClient{ - conn: conn, - config: config, - requests: newMessageDispatcher[message.RequestID, message.Response](1), - subscriber: newMessageDispatcher[message.SubscriptionID, json.RawMessage](10), - stop_signal: stop_signal, + conn: conn, + config: config, + requests: requests, + subscriptions: subscriptions, + stop_signal: stop_signal, } go func() { @@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { func (client *RPCClient) Close() { log.Warn("Close the rpc client...") // Send stop signal to the background receiving loop - client.stop_signal <- struct{}{} + close(client.stop_signal) // Close the underlying websocket connection err := client.conn.Close() @@ -82,7 +85,7 @@ func (client *RPCClient) Close() { } client.requests.clear() - client.subscriber.clear() + client.subscriptions.clear() } // Call Sends an RPC call to the server with the specified method and @@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript } // Register a new subscription - sub := client.subscriber.register(subID) + sub := client.subscriptions.register(subID) return subID, sub, nil } @@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID } // On success unregister the subscription channel - client.subscriber.unregister(subID) + client.subscriptions.unregister(subID) return nil } // backgroundReceivingLoop Starts reading new messages from the underlying connection. func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error { - log.Debug("Background loop started") - new_msg_ch := make(chan []byte) - receive_err_ch := make(chan error) + new_msg := make(chan []byte) + receive_err := make(chan error) // Start listing for new messages go func() { for { _, msg, err := client.conn.ReadMessage() if err != nil { - receive_err_ch <- err + receive_err <- err return } - new_msg_ch <- msg + new_msg <- msg } }() @@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er case <-stop_signal: log.Warn("Stopping background receiving loop: received stop signal") return nil - case msg := <-new_msg_ch: + case msg := <-new_msg: err := client.handleNewMsg(msg) if err != nil { log.WithError(err).Error("Handle a new received msg") } - case err := <-receive_err_ch: + case err := <-receive_err: log.WithError(err).Error("Receive a new msg") return err } @@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { return fmt.Errorf("Response doesn't have an id") } - err := client.requests.disptach(*response.ID, response) + err := client.requests.dispatch(*response.ID, response) if err != nil { return fmt.Errorf("Dispatch a response: %w", err) } @@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { return fmt.Errorf("Failed to unmarshal notification params: %w", err) } - // Send the notification to the subscription - err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result) + // TODO: Consider using a more efficient design here because, + // on each registration, messageDispatcher will create a new subscription + // channel with the provided buffer size. If the buffer for the subscription + // channel is full, calling the dispatch method will block and in this + // case the client will not be able to handle any more messages. + err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result) if err != nil { return fmt.Errorf("Dispatch a notification: %w", err) } diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go index dab75ca..9177f6e 100644 --- a/jsonrpc/client/message_dispatcher.go +++ b/jsonrpc/client/message_dispatcher.go @@ -40,17 +40,18 @@ func (c *messageDispatcher[K, V]) length() int { return len(c.chans) } -// disptach Disptaches the msg to the channel with the given key -func (c *messageDispatcher[K, V]) disptach(key K, msg V) error { +// dispatch Disptaches the msg to the channel with the given key +func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error { c.Lock() - defer c.Unlock() + ch, ok := c.chans[key] + c.Unlock() - if ch, ok := c.chans[key]; ok { - ch <- msg - return nil + if !ok { + return fmt.Errorf("Channel not found") } - return fmt.Errorf("Channel not found") + ch <- msg + return nil } // unregister Unregisters the channel with the provided key diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go index 7cc1366..a1fc1c6 100644 --- a/jsonrpc/client/message_dispatcher_test.go +++ b/jsonrpc/client/message_dispatcher_test.go @@ -1,9 +1,6 @@ package client import ( - // "sync" - // "sync/atomic" - "sync" "sync/atomic" "testing" @@ -12,7 +9,6 @@ import ( ) func TestDispatchToChannel(t *testing.T) { - messageDispatcher := newMessageDispatcher[int, int](10) chanKey := 1 @@ -26,7 +22,7 @@ func TestDispatchToChannel(t *testing.T) { wg.Add(1) go func() { for i := 0; i < 50; i++ { - err := messageDispatcher.disptach(chanKey, i) + err := messageDispatcher.dispatch(chanKey, i) assert.Nil(t, err) } @@ -37,7 +33,7 @@ func TestDispatchToChannel(t *testing.T) { wg.Add(1) go func() { for i := 0; i < 50; i++ { - err := messageDispatcher.disptach(chanKey2, i) + err := messageDispatcher.dispatch(chanKey2, i) assert.Nil(t, err) } @@ -79,12 +75,11 @@ func TestUnregisterChannel(t *testing.T) { _, ok := <-rx assert.False(t, ok, "chan closed") - err := messageDispatcher.disptach(chanKey, 1) + err := messageDispatcher.dispatch(chanKey, 1) assert.NotNil(t, err) } func TestClearChannels(t *testing.T) { - messageDispatcher := newMessageDispatcher[int, int](1) chanKey := 1 @@ -96,6 +91,6 @@ func TestClearChannels(t *testing.T) { _, ok := <-rx assert.False(t, ok, "chan closed") - err := messageDispatcher.disptach(chanKey, 1) + err := messageDispatcher.dispatch(chanKey, 1) assert.NotNil(t, err) } |