From 6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 23 Jun 2024 15:57:43 +0200 Subject: Fix the issue with message dispatcher and channels Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block. --- jsonrpc/client/client.go | 124 +++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 48 deletions(-) (limited to 'jsonrpc/client/client.go') diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go index afcc5c7..1f12ced 100644 --- a/jsonrpc/client/client.go +++ b/jsonrpc/client/client.go @@ -3,9 +3,11 @@ package client import ( "bytes" "encoding/json" + "errors" "fmt" "math/rand" "strconv" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -20,21 +22,34 @@ const ( // Default timeout for receiving requests from the server, in milliseconds. DefaultTimeout = 3000 + + // The default buffer size for a subscription. + DefaultSubscriptionBufferSize = 10000 +) + +var ( + ClientIsDisconnectedErr = errors.New("Client is disconnected and closed") + TimeoutError = errors.New("Timeout Error") + InvalidResponseIDErr = errors.New("Invalid response ID") + InvalidResponseResultErr = errors.New("Invalid response result") + receivedStopSignalErr = errors.New("Received stop signal") ) // RPCClientConfig Holds the configuration settings for the RPC client. type RPCClientConfig struct { - Timeout int // Timeout for receiving requests from the server, in milliseconds. - Addr string // Address of the RPC server. + Timeout int // Timeout for receiving requests from the server, in milliseconds. + Addr string // Address of the RPC server. + SubscriptionBufferSize int // The buffer size for a subscription. } // RPCClient RPC Client type RPCClient struct { config RPCClientConfig conn *websocket.Conn - requests *messageDispatcher[message.RequestID, message.Response] - subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage] - stop_signal chan struct{} + requests *messageDispatcher + subscriptions *subscriptions + stopSignal chan struct{} + isClosed atomic.Bool } // NewRPCClient Creates a new instance of RPCClient with the provided configuration. @@ -46,25 +61,29 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { } log.Infof("Successfully connected to the server: %s", config.Addr) - if config.Timeout == 0 { + if config.Timeout <= 0 { config.Timeout = DefaultTimeout } - stop_signal := make(chan struct{}) + if config.SubscriptionBufferSize <= 0 { + config.SubscriptionBufferSize = DefaultSubscriptionBufferSize + } + + stopSignal := make(chan struct{}) - requests := newMessageDispatcher[message.RequestID, message.Response](0) - subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100) + requests := newMessageDispatcher() + subscriptions := newSubscriptions(config.SubscriptionBufferSize) client := &RPCClient{ conn: conn, config: config, requests: requests, subscriptions: subscriptions, - stop_signal: stop_signal, + stopSignal: stopSignal, } go func() { - if err := client.backgroundReceivingLoop(stop_signal); err != nil { + if err := client.backgroundReceivingLoop(stopSignal); err != nil { client.Close() } }() @@ -74,9 +93,14 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { // Close Closes the underlying websocket connection and stop the receiving loop. func (client *RPCClient) Close() { + // Check if it's already closed + if !client.isClosed.CompareAndSwap(false, true) { + return + } + log.Warn("Close the rpc client...") // Send stop signal to the background receiving loop - close(client.stop_signal) + close(client.stopSignal) // Close the underlying websocket connection err := client.conn.Close() @@ -90,7 +114,7 @@ func (client *RPCClient) Close() { // Call Sends an RPC call to the server with the specified method and // parameters, and returns the response. -func (client *RPCClient) Call(method string, params any) (*json.RawMessage, error) { +func (client *RPCClient) Call(method string, params any) (json.RawMessage, error) { log.Tracef("Call -> method: %s, params: %v", method, params) response, err := client.sendRequest(method, params) if err != nil { @@ -101,29 +125,27 @@ func (client *RPCClient) Call(method string, params any) (*json.RawMessage, erro } // Subscribe Sends a subscription request to the server with the specified -// method and parameters, and it returns the subscription ID and the channel to -// receive notifications. -func (client *RPCClient) Subscribe(method string, params any) (message.SubscriptionID, <-chan json.RawMessage, error) { +// method and parameters, and it returns the subscription. +func (client *RPCClient) Subscribe(method string, params any) (*Subscription, error) { log.Tracef("Sbuscribe -> method: %s, params: %v", method, params) response, err := client.sendRequest(method, params) if err != nil { - return 0, nil, err + return nil, err } if response.Result == nil { - return 0, nil, fmt.Errorf("Invalid response result") + return nil, InvalidResponseResultErr } var subID message.SubscriptionID - err = json.Unmarshal(*response.Result, &subID) + err = json.Unmarshal(response.Result, &subID) if err != nil { - return 0, nil, err + return nil, err } - // Register a new subscription - sub := client.subscriptions.register(subID) + sub := client.subscriptions.subscribe(subID) - return subID, sub, nil + return sub, nil } // Unsubscribe Sends an unsubscription request to the server to cancel the @@ -135,44 +157,49 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID return err } - // On success unregister the subscription channel - client.subscriptions.unregister(subID) + // On success unsubscribe + client.subscriptions.unsubscribe(subID) return nil } // backgroundReceivingLoop Starts reading new messages from the underlying connection. -func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error { +func (client *RPCClient) backgroundReceivingLoop(stopSignal <-chan struct{}) error { log.Debug("Background loop started") - new_msg := make(chan []byte) - receive_err := make(chan error) + newMsgCh := make(chan []byte) + receiveErrCh := make(chan error) // Start listing for new messages go func() { for { _, msg, err := client.conn.ReadMessage() if err != nil { - receive_err <- err + receiveErrCh <- err + return + } + select { + case <-client.stopSignal: return + case newMsgCh <- msg: } - new_msg <- msg } }() for { select { - case <-stop_signal: - log.Warn("Stopping background receiving loop: received stop signal") + case err := <-receiveErrCh: + log.WithError(err).Error("Read a new msg") + return err + case <-stopSignal: + log.Debug("Background receiving loop stopped %w", receivedStopSignalErr) return nil - case msg := <-new_msg: + case msg := <-newMsgCh: err := client.handleNewMsg(msg) if err != nil { - log.WithError(err).Error("Handle a new received msg") + log.WithError(err).Error("Handle a msg") + return err } - case err := <-receive_err: - log.WithError(err).Error("Receive a new msg") - return err } } } @@ -186,7 +213,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { decoder.DisallowUnknownFields() if err := decoder.Decode(&response); err == nil { if response.ID == nil { - return fmt.Errorf("Response doesn't have an id") + return InvalidResponseIDErr } err := client.requests.dispatch(*response.ID, response) @@ -202,18 +229,13 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { if err := json.Unmarshal(msg, ¬ification); err == nil { ntRes := message.NotificationResult{} - if err := json.Unmarshal(*notification.Params, &ntRes); err != nil { + if err := json.Unmarshal(notification.Params, &ntRes); err != nil { return fmt.Errorf("Failed to unmarshal notification params: %w", err) } - // TODO: Consider using a more efficient design here because, - // on each registration, messageDispatcher will create a new subscription - // channel with the provided buffer size. If the buffer for the subscription - // channel is full, calling the dispatch method will block and in this - // case the client will not be able to handle any more messages. - err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result) + err := client.subscriptions.notify(ntRes.Subscription, ntRes.Result) if err != nil { - return fmt.Errorf("Dispatch a notification: %w", err) + return fmt.Errorf("Notify a subscriber: %w", err) } log.Debugf("<-- %s", notification.String()) @@ -228,6 +250,10 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { func (client *RPCClient) sendRequest(method string, params any) (message.Response, error) { response := message.Response{} + if client.isClosed.Load() { + return response, ClientIsDisconnectedErr + } + params_bytes, err := json.Marshal(params) if err != nil { return response, err @@ -262,8 +288,10 @@ func (client *RPCClient) sendRequest(method string, params any) (message.Respons // Waits the response, it fails and return error if it exceed the timeout select { case response = <-rx_ch: + case <-client.stopSignal: + return response, ClientIsDisconnectedErr case <-time.After(time.Duration(client.config.Timeout) * time.Millisecond): - return response, fmt.Errorf("Timeout error") + return response, TimeoutError } err = validateResponse(&response, id) @@ -285,7 +313,7 @@ func validateResponse(res *message.Response, reqID message.RequestID) error { if res.ID != nil { if *res.ID != reqID { - return fmt.Errorf("Invalid response id") + return InvalidResponseIDErr } } -- cgit v1.2.3