aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-15 05:46:08 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-15 05:46:08 +0200
commit1a221d4d37f4bdee1fb45141828e201948e8ddd3 (patch)
treea6ed1a19e98c336bc598d4bfe97fe3965d14e2c8 /jsonrpc/client
parente9af9bc115e0869570b9b79e1610b2bc08abe5a1 (diff)
fix typos
Diffstat (limited to 'jsonrpc/client')
-rw-r--r--jsonrpc/client/client.go56
-rw-r--r--jsonrpc/client/message_dispatcher.go15
-rw-r--r--jsonrpc/client/message_dispatcher_test.go13
3 files changed, 43 insertions, 41 deletions
diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go
index 67ed307..afcc5c7 100644
--- a/jsonrpc/client/client.go
+++ b/jsonrpc/client/client.go
@@ -30,11 +30,11 @@ type RPCClientConfig struct {
// RPCClient RPC Client
type RPCClient struct {
- config RPCClientConfig
- conn *websocket.Conn
- requests *messageDispatcher[message.RequestID, message.Response]
- subscriber *messageDispatcher[message.SubscriptionID, json.RawMessage]
- stop_signal chan struct{}
+ config RPCClientConfig
+ conn *websocket.Conn
+ requests *messageDispatcher[message.RequestID, message.Response]
+ subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage]
+ stop_signal chan struct{}
}
// NewRPCClient Creates a new instance of RPCClient with the provided configuration.
@@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
config.Timeout = DefaultTimeout
}
- stop_signal := make(chan struct{}, 2)
+ stop_signal := make(chan struct{})
+
+ requests := newMessageDispatcher[message.RequestID, message.Response](0)
+ subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100)
client := &RPCClient{
- conn: conn,
- config: config,
- requests: newMessageDispatcher[message.RequestID, message.Response](1),
- subscriber: newMessageDispatcher[message.SubscriptionID, json.RawMessage](10),
- stop_signal: stop_signal,
+ conn: conn,
+ config: config,
+ requests: requests,
+ subscriptions: subscriptions,
+ stop_signal: stop_signal,
}
go func() {
@@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
func (client *RPCClient) Close() {
log.Warn("Close the rpc client...")
// Send stop signal to the background receiving loop
- client.stop_signal <- struct{}{}
+ close(client.stop_signal)
// Close the underlying websocket connection
err := client.conn.Close()
@@ -82,7 +85,7 @@ func (client *RPCClient) Close() {
}
client.requests.clear()
- client.subscriber.clear()
+ client.subscriptions.clear()
}
// Call Sends an RPC call to the server with the specified method and
@@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript
}
// Register a new subscription
- sub := client.subscriber.register(subID)
+ sub := client.subscriptions.register(subID)
return subID, sub, nil
}
@@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID
}
// On success unregister the subscription channel
- client.subscriber.unregister(subID)
+ client.subscriptions.unregister(subID)
return nil
}
// backgroundReceivingLoop Starts reading new messages from the underlying connection.
func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error {
-
log.Debug("Background loop started")
- new_msg_ch := make(chan []byte)
- receive_err_ch := make(chan error)
+ new_msg := make(chan []byte)
+ receive_err := make(chan error)
// Start listing for new messages
go func() {
for {
_, msg, err := client.conn.ReadMessage()
if err != nil {
- receive_err_ch <- err
+ receive_err <- err
return
}
- new_msg_ch <- msg
+ new_msg <- msg
}
}()
@@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er
case <-stop_signal:
log.Warn("Stopping background receiving loop: received stop signal")
return nil
- case msg := <-new_msg_ch:
+ case msg := <-new_msg:
err := client.handleNewMsg(msg)
if err != nil {
log.WithError(err).Error("Handle a new received msg")
}
- case err := <-receive_err_ch:
+ case err := <-receive_err:
log.WithError(err).Error("Receive a new msg")
return err
}
@@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Response doesn't have an id")
}
- err := client.requests.disptach(*response.ID, response)
+ err := client.requests.dispatch(*response.ID, response)
if err != nil {
return fmt.Errorf("Dispatch a response: %w", err)
}
@@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Failed to unmarshal notification params: %w", err)
}
- // Send the notification to the subscription
- err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result)
+ // TODO: Consider using a more efficient design here because,
+ // on each registration, messageDispatcher will create a new subscription
+ // channel with the provided buffer size. If the buffer for the subscription
+ // channel is full, calling the dispatch method will block and in this
+ // case the client will not be able to handle any more messages.
+ err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result)
if err != nil {
return fmt.Errorf("Dispatch a notification: %w", err)
}
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go
index dab75ca..9177f6e 100644
--- a/jsonrpc/client/message_dispatcher.go
+++ b/jsonrpc/client/message_dispatcher.go
@@ -40,17 +40,18 @@ func (c *messageDispatcher[K, V]) length() int {
return len(c.chans)
}
-// disptach Disptaches the msg to the channel with the given key
-func (c *messageDispatcher[K, V]) disptach(key K, msg V) error {
+// dispatch Disptaches the msg to the channel with the given key
+func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error {
c.Lock()
- defer c.Unlock()
+ ch, ok := c.chans[key]
+ c.Unlock()
- if ch, ok := c.chans[key]; ok {
- ch <- msg
- return nil
+ if !ok {
+ return fmt.Errorf("Channel not found")
}
- return fmt.Errorf("Channel not found")
+ ch <- msg
+ return nil
}
// unregister Unregisters the channel with the provided key
diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go
index 7cc1366..a1fc1c6 100644
--- a/jsonrpc/client/message_dispatcher_test.go
+++ b/jsonrpc/client/message_dispatcher_test.go
@@ -1,9 +1,6 @@
package client
import (
- // "sync"
- // "sync/atomic"
-
"sync"
"sync/atomic"
"testing"
@@ -12,7 +9,6 @@ import (
)
func TestDispatchToChannel(t *testing.T) {
-
messageDispatcher := newMessageDispatcher[int, int](10)
chanKey := 1
@@ -26,7 +22,7 @@ func TestDispatchToChannel(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 50; i++ {
- err := messageDispatcher.disptach(chanKey, i)
+ err := messageDispatcher.dispatch(chanKey, i)
assert.Nil(t, err)
}
@@ -37,7 +33,7 @@ func TestDispatchToChannel(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 50; i++ {
- err := messageDispatcher.disptach(chanKey2, i)
+ err := messageDispatcher.dispatch(chanKey2, i)
assert.Nil(t, err)
}
@@ -79,12 +75,11 @@ func TestUnregisterChannel(t *testing.T) {
_, ok := <-rx
assert.False(t, ok, "chan closed")
- err := messageDispatcher.disptach(chanKey, 1)
+ err := messageDispatcher.dispatch(chanKey, 1)
assert.NotNil(t, err)
}
func TestClearChannels(t *testing.T) {
-
messageDispatcher := newMessageDispatcher[int, int](1)
chanKey := 1
@@ -96,6 +91,6 @@ func TestClearChannels(t *testing.T) {
_, ok := <-rx
assert.False(t, ok, "chan closed")
- err := messageDispatcher.disptach(chanKey, 1)
+ err := messageDispatcher.dispatch(chanKey, 1)
assert.NotNil(t, err)
}