From 1a221d4d37f4bdee1fb45141828e201948e8ddd3 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sat, 15 Jun 2024 05:46:08 +0200 Subject: fix typos --- jsonrpc/client/client.go | 56 +++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 25 deletions(-) (limited to 'jsonrpc/client/client.go') diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go index 67ed307..afcc5c7 100644 --- a/jsonrpc/client/client.go +++ b/jsonrpc/client/client.go @@ -30,11 +30,11 @@ type RPCClientConfig struct { // RPCClient RPC Client type RPCClient struct { - config RPCClientConfig - conn *websocket.Conn - requests *messageDispatcher[message.RequestID, message.Response] - subscriber *messageDispatcher[message.SubscriptionID, json.RawMessage] - stop_signal chan struct{} + config RPCClientConfig + conn *websocket.Conn + requests *messageDispatcher[message.RequestID, message.Response] + subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage] + stop_signal chan struct{} } // NewRPCClient Creates a new instance of RPCClient with the provided configuration. @@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { config.Timeout = DefaultTimeout } - stop_signal := make(chan struct{}, 2) + stop_signal := make(chan struct{}) + + requests := newMessageDispatcher[message.RequestID, message.Response](0) + subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100) client := &RPCClient{ - conn: conn, - config: config, - requests: newMessageDispatcher[message.RequestID, message.Response](1), - subscriber: newMessageDispatcher[message.SubscriptionID, json.RawMessage](10), - stop_signal: stop_signal, + conn: conn, + config: config, + requests: requests, + subscriptions: subscriptions, + stop_signal: stop_signal, } go func() { @@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) { func (client *RPCClient) Close() { log.Warn("Close the rpc client...") // Send stop signal to the background receiving loop - client.stop_signal <- struct{}{} + close(client.stop_signal) // Close the underlying websocket connection err := client.conn.Close() @@ -82,7 +85,7 @@ func (client *RPCClient) Close() { } client.requests.clear() - client.subscriber.clear() + client.subscriptions.clear() } // Call Sends an RPC call to the server with the specified method and @@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript } // Register a new subscription - sub := client.subscriber.register(subID) + sub := client.subscriptions.register(subID) return subID, sub, nil } @@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID } // On success unregister the subscription channel - client.subscriber.unregister(subID) + client.subscriptions.unregister(subID) return nil } // backgroundReceivingLoop Starts reading new messages from the underlying connection. func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error { - log.Debug("Background loop started") - new_msg_ch := make(chan []byte) - receive_err_ch := make(chan error) + new_msg := make(chan []byte) + receive_err := make(chan error) // Start listing for new messages go func() { for { _, msg, err := client.conn.ReadMessage() if err != nil { - receive_err_ch <- err + receive_err <- err return } - new_msg_ch <- msg + new_msg <- msg } }() @@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er case <-stop_signal: log.Warn("Stopping background receiving loop: received stop signal") return nil - case msg := <-new_msg_ch: + case msg := <-new_msg: err := client.handleNewMsg(msg) if err != nil { log.WithError(err).Error("Handle a new received msg") } - case err := <-receive_err_ch: + case err := <-receive_err: log.WithError(err).Error("Receive a new msg") return err } @@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { return fmt.Errorf("Response doesn't have an id") } - err := client.requests.disptach(*response.ID, response) + err := client.requests.dispatch(*response.ID, response) if err != nil { return fmt.Errorf("Dispatch a response: %w", err) } @@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error { return fmt.Errorf("Failed to unmarshal notification params: %w", err) } - // Send the notification to the subscription - err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result) + // TODO: Consider using a more efficient design here because, + // on each registration, messageDispatcher will create a new subscription + // channel with the provided buffer size. If the buffer for the subscription + // channel is full, calling the dispatch method will block and in this + // case the client will not be able to handle any more messages. + err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result) if err != nil { return fmt.Errorf("Dispatch a notification: %w", err) } -- cgit v1.2.3