aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/client.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-23 15:57:43 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-09 11:46:03 +0200
commit6355144b8c3514cccc5c2ab4f7c4fd8e76a1a9fc (patch)
tree3c31e350c8da79198f6127398905461addccef1e /jsonrpc/client/client.go
parent223d80fa52d3efd2909b7061e3c42a0ed930b4ff (diff)
Fix the issue with message dispatcher and channels
Resolved a previous error where each subscription would create a new channel with the fixed buffer size. This caused blocking when the channel buffer was full, preventing the client from handling additional messages. Now, there is a `subscriptions` struct that holds a queue for receiving notifications, ensuring the notify function does not block.
Diffstat (limited to 'jsonrpc/client/client.go')
-rw-r--r--jsonrpc/client/client.go124
1 files changed, 76 insertions, 48 deletions
diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go
index afcc5c7..1f12ced 100644
--- a/jsonrpc/client/client.go
+++ b/jsonrpc/client/client.go
@@ -3,9 +3,11 @@ package client
import (
"bytes"
"encoding/json"
+ "errors"
"fmt"
"math/rand"
"strconv"
+ "sync/atomic"
"time"
"github.com/gorilla/websocket"
@@ -20,21 +22,34 @@ const (
// Default timeout for receiving requests from the server, in milliseconds.
DefaultTimeout = 3000
+
+ // The default buffer size for a subscription.
+ DefaultSubscriptionBufferSize = 10000
+)
+
+var (
+ ClientIsDisconnectedErr = errors.New("Client is disconnected and closed")
+ TimeoutError = errors.New("Timeout Error")
+ InvalidResponseIDErr = errors.New("Invalid response ID")
+ InvalidResponseResultErr = errors.New("Invalid response result")
+ receivedStopSignalErr = errors.New("Received stop signal")
)
// RPCClientConfig Holds the configuration settings for the RPC client.
type RPCClientConfig struct {
- Timeout int // Timeout for receiving requests from the server, in milliseconds.
- Addr string // Address of the RPC server.
+ Timeout int // Timeout for receiving requests from the server, in milliseconds.
+ Addr string // Address of the RPC server.
+ SubscriptionBufferSize int // The buffer size for a subscription.
}
// RPCClient RPC Client
type RPCClient struct {
config RPCClientConfig
conn *websocket.Conn
- requests *messageDispatcher[message.RequestID, message.Response]
- subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage]
- stop_signal chan struct{}
+ requests *messageDispatcher
+ subscriptions *subscriptions
+ stopSignal chan struct{}
+ isClosed atomic.Bool
}
// NewRPCClient Creates a new instance of RPCClient with the provided configuration.
@@ -46,25 +61,29 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
}
log.Infof("Successfully connected to the server: %s", config.Addr)
- if config.Timeout == 0 {
+ if config.Timeout <= 0 {
config.Timeout = DefaultTimeout
}
- stop_signal := make(chan struct{})
+ if config.SubscriptionBufferSize <= 0 {
+ config.SubscriptionBufferSize = DefaultSubscriptionBufferSize
+ }
+
+ stopSignal := make(chan struct{})
- requests := newMessageDispatcher[message.RequestID, message.Response](0)
- subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100)
+ requests := newMessageDispatcher()
+ subscriptions := newSubscriptions(config.SubscriptionBufferSize)
client := &RPCClient{
conn: conn,
config: config,
requests: requests,
subscriptions: subscriptions,
- stop_signal: stop_signal,
+ stopSignal: stopSignal,
}
go func() {
- if err := client.backgroundReceivingLoop(stop_signal); err != nil {
+ if err := client.backgroundReceivingLoop(stopSignal); err != nil {
client.Close()
}
}()
@@ -74,9 +93,14 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
// Close Closes the underlying websocket connection and stop the receiving loop.
func (client *RPCClient) Close() {
+ // Check if it's already closed
+ if !client.isClosed.CompareAndSwap(false, true) {
+ return
+ }
+
log.Warn("Close the rpc client...")
// Send stop signal to the background receiving loop
- close(client.stop_signal)
+ close(client.stopSignal)
// Close the underlying websocket connection
err := client.conn.Close()
@@ -90,7 +114,7 @@ func (client *RPCClient) Close() {
// Call Sends an RPC call to the server with the specified method and
// parameters, and returns the response.
-func (client *RPCClient) Call(method string, params any) (*json.RawMessage, error) {
+func (client *RPCClient) Call(method string, params any) (json.RawMessage, error) {
log.Tracef("Call -> method: %s, params: %v", method, params)
response, err := client.sendRequest(method, params)
if err != nil {
@@ -101,29 +125,27 @@ func (client *RPCClient) Call(method string, params any) (*json.RawMessage, erro
}
// Subscribe Sends a subscription request to the server with the specified
-// method and parameters, and it returns the subscription ID and the channel to
-// receive notifications.
-func (client *RPCClient) Subscribe(method string, params any) (message.SubscriptionID, <-chan json.RawMessage, error) {
+// method and parameters, and it returns the subscription.
+func (client *RPCClient) Subscribe(method string, params any) (*Subscription, error) {
log.Tracef("Sbuscribe -> method: %s, params: %v", method, params)
response, err := client.sendRequest(method, params)
if err != nil {
- return 0, nil, err
+ return nil, err
}
if response.Result == nil {
- return 0, nil, fmt.Errorf("Invalid response result")
+ return nil, InvalidResponseResultErr
}
var subID message.SubscriptionID
- err = json.Unmarshal(*response.Result, &subID)
+ err = json.Unmarshal(response.Result, &subID)
if err != nil {
- return 0, nil, err
+ return nil, err
}
- // Register a new subscription
- sub := client.subscriptions.register(subID)
+ sub := client.subscriptions.subscribe(subID)
- return subID, sub, nil
+ return sub, nil
}
// Unsubscribe Sends an unsubscription request to the server to cancel the
@@ -135,44 +157,49 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID
return err
}
- // On success unregister the subscription channel
- client.subscriptions.unregister(subID)
+ // On success unsubscribe
+ client.subscriptions.unsubscribe(subID)
return nil
}
// backgroundReceivingLoop Starts reading new messages from the underlying connection.
-func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error {
+func (client *RPCClient) backgroundReceivingLoop(stopSignal <-chan struct{}) error {
log.Debug("Background loop started")
- new_msg := make(chan []byte)
- receive_err := make(chan error)
+ newMsgCh := make(chan []byte)
+ receiveErrCh := make(chan error)
// Start listing for new messages
go func() {
for {
_, msg, err := client.conn.ReadMessage()
if err != nil {
- receive_err <- err
+ receiveErrCh <- err
+ return
+ }
+ select {
+ case <-client.stopSignal:
return
+ case newMsgCh <- msg:
}
- new_msg <- msg
}
}()
for {
select {
- case <-stop_signal:
- log.Warn("Stopping background receiving loop: received stop signal")
+ case err := <-receiveErrCh:
+ log.WithError(err).Error("Read a new msg")
+ return err
+ case <-stopSignal:
+ log.Debug("Background receiving loop stopped %w", receivedStopSignalErr)
return nil
- case msg := <-new_msg:
+ case msg := <-newMsgCh:
err := client.handleNewMsg(msg)
if err != nil {
- log.WithError(err).Error("Handle a new received msg")
+ log.WithError(err).Error("Handle a msg")
+ return err
}
- case err := <-receive_err:
- log.WithError(err).Error("Receive a new msg")
- return err
}
}
}
@@ -186,7 +213,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
decoder.DisallowUnknownFields()
if err := decoder.Decode(&response); err == nil {
if response.ID == nil {
- return fmt.Errorf("Response doesn't have an id")
+ return InvalidResponseIDErr
}
err := client.requests.dispatch(*response.ID, response)
@@ -202,18 +229,13 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
if err := json.Unmarshal(msg, &notification); err == nil {
ntRes := message.NotificationResult{}
- if err := json.Unmarshal(*notification.Params, &ntRes); err != nil {
+ if err := json.Unmarshal(notification.Params, &ntRes); err != nil {
return fmt.Errorf("Failed to unmarshal notification params: %w", err)
}
- // TODO: Consider using a more efficient design here because,
- // on each registration, messageDispatcher will create a new subscription
- // channel with the provided buffer size. If the buffer for the subscription
- // channel is full, calling the dispatch method will block and in this
- // case the client will not be able to handle any more messages.
- err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result)
+ err := client.subscriptions.notify(ntRes.Subscription, ntRes.Result)
if err != nil {
- return fmt.Errorf("Dispatch a notification: %w", err)
+ return fmt.Errorf("Notify a subscriber: %w", err)
}
log.Debugf("<-- %s", notification.String())
@@ -228,6 +250,10 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
func (client *RPCClient) sendRequest(method string, params any) (message.Response, error) {
response := message.Response{}
+ if client.isClosed.Load() {
+ return response, ClientIsDisconnectedErr
+ }
+
params_bytes, err := json.Marshal(params)
if err != nil {
return response, err
@@ -262,8 +288,10 @@ func (client *RPCClient) sendRequest(method string, params any) (message.Respons
// Waits the response, it fails and return error if it exceed the timeout
select {
case response = <-rx_ch:
+ case <-client.stopSignal:
+ return response, ClientIsDisconnectedErr
case <-time.After(time.Duration(client.config.Timeout) * time.Millisecond):
- return response, fmt.Errorf("Timeout error")
+ return response, TimeoutError
}
err = validateResponse(&response, id)
@@ -285,7 +313,7 @@ func validateResponse(res *message.Response, reqID message.RequestID) error {
if res.ID != nil {
if *res.ID != reqID {
- return fmt.Errorf("Invalid response id")
+ return InvalidResponseIDErr
}
}