From 1a221d4d37f4bdee1fb45141828e201948e8ddd3 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@karyontech.net>
Date: Sat, 15 Jun 2024 05:46:08 +0200
Subject: fix typos

---
 jsonrpc/client/client.go                  | 56 +++++++++++++++++--------------
 jsonrpc/client/message_dispatcher.go      | 15 +++++----
 jsonrpc/client/message_dispatcher_test.go | 13 +++----
 3 files changed, 43 insertions(+), 41 deletions(-)

(limited to 'jsonrpc')

diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go
index 67ed307..afcc5c7 100644
--- a/jsonrpc/client/client.go
+++ b/jsonrpc/client/client.go
@@ -30,11 +30,11 @@ type RPCClientConfig struct {
 
 // RPCClient RPC Client
 type RPCClient struct {
-	config      RPCClientConfig
-	conn        *websocket.Conn
-	requests    *messageDispatcher[message.RequestID, message.Response]
-	subscriber  *messageDispatcher[message.SubscriptionID, json.RawMessage]
-	stop_signal chan struct{}
+	config        RPCClientConfig
+	conn          *websocket.Conn
+	requests      *messageDispatcher[message.RequestID, message.Response]
+	subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage]
+	stop_signal   chan struct{}
 }
 
 // NewRPCClient Creates a new instance of RPCClient with the provided configuration.
@@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
 		config.Timeout = DefaultTimeout
 	}
 
-	stop_signal := make(chan struct{}, 2)
+	stop_signal := make(chan struct{})
+
+	requests := newMessageDispatcher[message.RequestID, message.Response](0)
+	subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100)
 
 	client := &RPCClient{
-		conn:        conn,
-		config:      config,
-		requests:    newMessageDispatcher[message.RequestID, message.Response](1),
-		subscriber:  newMessageDispatcher[message.SubscriptionID, json.RawMessage](10),
-		stop_signal: stop_signal,
+		conn:          conn,
+		config:        config,
+		requests:      requests,
+		subscriptions: subscriptions,
+		stop_signal:   stop_signal,
 	}
 
 	go func() {
@@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
 func (client *RPCClient) Close() {
 	log.Warn("Close the rpc client...")
 	// Send stop signal to the background receiving loop
-	client.stop_signal <- struct{}{}
+	close(client.stop_signal)
 
 	// Close the underlying websocket connection
 	err := client.conn.Close()
@@ -82,7 +85,7 @@ func (client *RPCClient) Close() {
 	}
 
 	client.requests.clear()
-	client.subscriber.clear()
+	client.subscriptions.clear()
 }
 
 // Call Sends an RPC call to the server with the specified method and
@@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript
 	}
 
 	// Register a new subscription
-	sub := client.subscriber.register(subID)
+	sub := client.subscriptions.register(subID)
 
 	return subID, sub, nil
 }
@@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID
 	}
 
 	// On success unregister the subscription channel
-	client.subscriber.unregister(subID)
+	client.subscriptions.unregister(subID)
 
 	return nil
 }
 
 // backgroundReceivingLoop Starts reading new messages from the underlying connection.
 func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error {
-
 	log.Debug("Background loop started")
 
-	new_msg_ch := make(chan []byte)
-	receive_err_ch := make(chan error)
+	new_msg := make(chan []byte)
+	receive_err := make(chan error)
 
 	// Start listing for new messages
 	go func() {
 		for {
 			_, msg, err := client.conn.ReadMessage()
 			if err != nil {
-				receive_err_ch <- err
+				receive_err <- err
 				return
 			}
-			new_msg_ch <- msg
+			new_msg <- msg
 		}
 	}()
 
@@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er
 		case <-stop_signal:
 			log.Warn("Stopping background receiving loop: received stop signal")
 			return nil
-		case msg := <-new_msg_ch:
+		case msg := <-new_msg:
 			err := client.handleNewMsg(msg)
 			if err != nil {
 				log.WithError(err).Error("Handle a new received msg")
 			}
-		case err := <-receive_err_ch:
+		case err := <-receive_err:
 			log.WithError(err).Error("Receive a new msg")
 			return err
 		}
@@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
 			return fmt.Errorf("Response doesn't have an id")
 		}
 
-		err := client.requests.disptach(*response.ID, response)
+		err := client.requests.dispatch(*response.ID, response)
 		if err != nil {
 			return fmt.Errorf("Dispatch a response: %w", err)
 		}
@@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
 			return fmt.Errorf("Failed to unmarshal notification params: %w", err)
 		}
 
-		// Send the notification to the subscription
-		err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result)
+		// TODO: Consider using a more efficient design here because,
+		// on each registration, messageDispatcher will create a new subscription
+		// channel with the provided buffer size. If the buffer for the subscription
+		// channel is full, calling the dispatch method will block and in this
+		// case the client will not be able to handle any more messages.
+		err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result)
 		if err != nil {
 			return fmt.Errorf("Dispatch a notification: %w", err)
 		}
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go
index dab75ca..9177f6e 100644
--- a/jsonrpc/client/message_dispatcher.go
+++ b/jsonrpc/client/message_dispatcher.go
@@ -40,17 +40,18 @@ func (c *messageDispatcher[K, V]) length() int {
 	return len(c.chans)
 }
 
-// disptach Disptaches the msg to the channel with the given key
-func (c *messageDispatcher[K, V]) disptach(key K, msg V) error {
+// dispatch Disptaches the msg to the channel with the given key
+func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error {
 	c.Lock()
-	defer c.Unlock()
+	ch, ok := c.chans[key]
+	c.Unlock()
 
-	if ch, ok := c.chans[key]; ok {
-		ch <- msg
-		return nil
+	if !ok {
+		return fmt.Errorf("Channel not found")
 	}
 
-	return fmt.Errorf("Channel not found")
+	ch <- msg
+	return nil
 }
 
 // unregister Unregisters the channel with the provided key
diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go
index 7cc1366..a1fc1c6 100644
--- a/jsonrpc/client/message_dispatcher_test.go
+++ b/jsonrpc/client/message_dispatcher_test.go
@@ -1,9 +1,6 @@
 package client
 
 import (
-	// "sync"
-	// "sync/atomic"
-
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -12,7 +9,6 @@ import (
 )
 
 func TestDispatchToChannel(t *testing.T) {
-
 	messageDispatcher := newMessageDispatcher[int, int](10)
 
 	chanKey := 1
@@ -26,7 +22,7 @@ func TestDispatchToChannel(t *testing.T) {
 	wg.Add(1)
 	go func() {
 		for i := 0; i < 50; i++ {
-			err := messageDispatcher.disptach(chanKey, i)
+			err := messageDispatcher.dispatch(chanKey, i)
 			assert.Nil(t, err)
 		}
 
@@ -37,7 +33,7 @@ func TestDispatchToChannel(t *testing.T) {
 	wg.Add(1)
 	go func() {
 		for i := 0; i < 50; i++ {
-			err := messageDispatcher.disptach(chanKey2, i)
+			err := messageDispatcher.dispatch(chanKey2, i)
 			assert.Nil(t, err)
 		}
 
@@ -79,12 +75,11 @@ func TestUnregisterChannel(t *testing.T) {
 	_, ok := <-rx
 	assert.False(t, ok, "chan closed")
 
-	err := messageDispatcher.disptach(chanKey, 1)
+	err := messageDispatcher.dispatch(chanKey, 1)
 	assert.NotNil(t, err)
 }
 
 func TestClearChannels(t *testing.T) {
-
 	messageDispatcher := newMessageDispatcher[int, int](1)
 
 	chanKey := 1
@@ -96,6 +91,6 @@ func TestClearChannels(t *testing.T) {
 	_, ok := <-rx
 	assert.False(t, ok, "chan closed")
 
-	err := messageDispatcher.disptach(chanKey, 1)
+	err := messageDispatcher.dispatch(chanKey, 1)
 	assert.NotNil(t, err)
 }
-- 
cgit v1.2.3