aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/client/client.go
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-15 05:46:08 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-15 05:46:08 +0200
commit1a221d4d37f4bdee1fb45141828e201948e8ddd3 (patch)
treea6ed1a19e98c336bc598d4bfe97fe3965d14e2c8 /jsonrpc/client/client.go
parente9af9bc115e0869570b9b79e1610b2bc08abe5a1 (diff)
fix typos
Diffstat (limited to 'jsonrpc/client/client.go')
-rw-r--r--jsonrpc/client/client.go56
1 files changed, 31 insertions, 25 deletions
diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go
index 67ed307..afcc5c7 100644
--- a/jsonrpc/client/client.go
+++ b/jsonrpc/client/client.go
@@ -30,11 +30,11 @@ type RPCClientConfig struct {
// RPCClient RPC Client
type RPCClient struct {
- config RPCClientConfig
- conn *websocket.Conn
- requests *messageDispatcher[message.RequestID, message.Response]
- subscriber *messageDispatcher[message.SubscriptionID, json.RawMessage]
- stop_signal chan struct{}
+ config RPCClientConfig
+ conn *websocket.Conn
+ requests *messageDispatcher[message.RequestID, message.Response]
+ subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage]
+ stop_signal chan struct{}
}
// NewRPCClient Creates a new instance of RPCClient with the provided configuration.
@@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
config.Timeout = DefaultTimeout
}
- stop_signal := make(chan struct{}, 2)
+ stop_signal := make(chan struct{})
+
+ requests := newMessageDispatcher[message.RequestID, message.Response](0)
+ subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100)
client := &RPCClient{
- conn: conn,
- config: config,
- requests: newMessageDispatcher[message.RequestID, message.Response](1),
- subscriber: newMessageDispatcher[message.SubscriptionID, json.RawMessage](10),
- stop_signal: stop_signal,
+ conn: conn,
+ config: config,
+ requests: requests,
+ subscriptions: subscriptions,
+ stop_signal: stop_signal,
}
go func() {
@@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
func (client *RPCClient) Close() {
log.Warn("Close the rpc client...")
// Send stop signal to the background receiving loop
- client.stop_signal <- struct{}{}
+ close(client.stop_signal)
// Close the underlying websocket connection
err := client.conn.Close()
@@ -82,7 +85,7 @@ func (client *RPCClient) Close() {
}
client.requests.clear()
- client.subscriber.clear()
+ client.subscriptions.clear()
}
// Call Sends an RPC call to the server with the specified method and
@@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript
}
// Register a new subscription
- sub := client.subscriber.register(subID)
+ sub := client.subscriptions.register(subID)
return subID, sub, nil
}
@@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID
}
// On success unregister the subscription channel
- client.subscriber.unregister(subID)
+ client.subscriptions.unregister(subID)
return nil
}
// backgroundReceivingLoop Starts reading new messages from the underlying connection.
func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error {
-
log.Debug("Background loop started")
- new_msg_ch := make(chan []byte)
- receive_err_ch := make(chan error)
+ new_msg := make(chan []byte)
+ receive_err := make(chan error)
// Start listing for new messages
go func() {
for {
_, msg, err := client.conn.ReadMessage()
if err != nil {
- receive_err_ch <- err
+ receive_err <- err
return
}
- new_msg_ch <- msg
+ new_msg <- msg
}
}()
@@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er
case <-stop_signal:
log.Warn("Stopping background receiving loop: received stop signal")
return nil
- case msg := <-new_msg_ch:
+ case msg := <-new_msg:
err := client.handleNewMsg(msg)
if err != nil {
log.WithError(err).Error("Handle a new received msg")
}
- case err := <-receive_err_ch:
+ case err := <-receive_err:
log.WithError(err).Error("Receive a new msg")
return err
}
@@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Response doesn't have an id")
}
- err := client.requests.disptach(*response.ID, response)
+ err := client.requests.dispatch(*response.ID, response)
if err != nil {
return fmt.Errorf("Dispatch a response: %w", err)
}
@@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Failed to unmarshal notification params: %w", err)
}
- // Send the notification to the subscription
- err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result)
+ // TODO: Consider using a more efficient design here because,
+ // on each registration, messageDispatcher will create a new subscription
+ // channel with the provided buffer size. If the buffer for the subscription
+ // channel is full, calling the dispatch method will block and in this
+ // case the client will not be able to handle any more messages.
+ err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result)
if err != nil {
return fmt.Errorf("Dispatch a notification: %w", err)
}