aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--example_test.go70
-rw-r--r--jsonrpc/client/client.go56
-rw-r--r--jsonrpc/client/message_dispatcher.go15
-rw-r--r--jsonrpc/client/message_dispatcher_test.go13
4 files changed, 43 insertions, 111 deletions
diff --git a/example_test.go b/example_test.go
deleted file mode 100644
index ca4a642..0000000
--- a/example_test.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package karyongo
-
-import (
- "encoding/json"
- "math/rand/v2"
- "os"
- "time"
-
- rpc "github.com/karyontech/karyon-go/jsonrpc/client"
- log "github.com/sirupsen/logrus"
-)
-
-type Pong struct{}
-
-func runNewClient() error {
- config := rpc.RPCClientConfig{
- Addr: "ws://127.0.0.1:6000",
- }
-
- client, err := rpc.NewRPCClient(config)
- if err != nil {
- return err
- }
- defer client.Close()
-
- subID, ch, err := client.Subscribe("Calc.log_subscribe", nil)
- if err != nil {
- return err
- }
- log.Infof("Subscribed successfully: %d\n", subID)
-
- go func() {
- for notification := range ch {
- log.Infof("Receive new notification: %s\n", notification)
- }
- }()
-
- for {
- millisecond := rand.IntN(2000-500) + 500
- time.Sleep(time.Duration(millisecond) * time.Millisecond)
- result, err := client.Call("Calc.ping", nil)
- if err != nil {
- return err
- }
-
- pongMsg := Pong{}
- err = json.Unmarshal(*result, &pongMsg)
- if err != nil {
- return err
- }
- }
-
-}
-
-func main() {
- lvl, ok := os.LookupEnv("LOG_LEVEL")
- if !ok {
- lvl = "debug"
- }
- ll, err := log.ParseLevel(lvl)
- if err != nil {
- ll = log.DebugLevel
- }
- log.SetLevel(ll)
-
- err = runNewClient()
- if err != nil {
- log.Fatal(err)
- }
-}
diff --git a/jsonrpc/client/client.go b/jsonrpc/client/client.go
index 67ed307..afcc5c7 100644
--- a/jsonrpc/client/client.go
+++ b/jsonrpc/client/client.go
@@ -30,11 +30,11 @@ type RPCClientConfig struct {
// RPCClient RPC Client
type RPCClient struct {
- config RPCClientConfig
- conn *websocket.Conn
- requests *messageDispatcher[message.RequestID, message.Response]
- subscriber *messageDispatcher[message.SubscriptionID, json.RawMessage]
- stop_signal chan struct{}
+ config RPCClientConfig
+ conn *websocket.Conn
+ requests *messageDispatcher[message.RequestID, message.Response]
+ subscriptions *messageDispatcher[message.SubscriptionID, json.RawMessage]
+ stop_signal chan struct{}
}
// NewRPCClient Creates a new instance of RPCClient with the provided configuration.
@@ -50,14 +50,17 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
config.Timeout = DefaultTimeout
}
- stop_signal := make(chan struct{}, 2)
+ stop_signal := make(chan struct{})
+
+ requests := newMessageDispatcher[message.RequestID, message.Response](0)
+ subscriptions := newMessageDispatcher[message.SubscriptionID, json.RawMessage](100)
client := &RPCClient{
- conn: conn,
- config: config,
- requests: newMessageDispatcher[message.RequestID, message.Response](1),
- subscriber: newMessageDispatcher[message.SubscriptionID, json.RawMessage](10),
- stop_signal: stop_signal,
+ conn: conn,
+ config: config,
+ requests: requests,
+ subscriptions: subscriptions,
+ stop_signal: stop_signal,
}
go func() {
@@ -73,7 +76,7 @@ func NewRPCClient(config RPCClientConfig) (*RPCClient, error) {
func (client *RPCClient) Close() {
log.Warn("Close the rpc client...")
// Send stop signal to the background receiving loop
- client.stop_signal <- struct{}{}
+ close(client.stop_signal)
// Close the underlying websocket connection
err := client.conn.Close()
@@ -82,7 +85,7 @@ func (client *RPCClient) Close() {
}
client.requests.clear()
- client.subscriber.clear()
+ client.subscriptions.clear()
}
// Call Sends an RPC call to the server with the specified method and
@@ -118,7 +121,7 @@ func (client *RPCClient) Subscribe(method string, params any) (message.Subscript
}
// Register a new subscription
- sub := client.subscriber.register(subID)
+ sub := client.subscriptions.register(subID)
return subID, sub, nil
}
@@ -133,28 +136,27 @@ func (client *RPCClient) Unsubscribe(method string, subID message.SubscriptionID
}
// On success unregister the subscription channel
- client.subscriber.unregister(subID)
+ client.subscriptions.unregister(subID)
return nil
}
// backgroundReceivingLoop Starts reading new messages from the underlying connection.
func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) error {
-
log.Debug("Background loop started")
- new_msg_ch := make(chan []byte)
- receive_err_ch := make(chan error)
+ new_msg := make(chan []byte)
+ receive_err := make(chan error)
// Start listing for new messages
go func() {
for {
_, msg, err := client.conn.ReadMessage()
if err != nil {
- receive_err_ch <- err
+ receive_err <- err
return
}
- new_msg_ch <- msg
+ new_msg <- msg
}
}()
@@ -163,12 +165,12 @@ func (client *RPCClient) backgroundReceivingLoop(stop_signal <-chan struct{}) er
case <-stop_signal:
log.Warn("Stopping background receiving loop: received stop signal")
return nil
- case msg := <-new_msg_ch:
+ case msg := <-new_msg:
err := client.handleNewMsg(msg)
if err != nil {
log.WithError(err).Error("Handle a new received msg")
}
- case err := <-receive_err_ch:
+ case err := <-receive_err:
log.WithError(err).Error("Receive a new msg")
return err
}
@@ -187,7 +189,7 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Response doesn't have an id")
}
- err := client.requests.disptach(*response.ID, response)
+ err := client.requests.dispatch(*response.ID, response)
if err != nil {
return fmt.Errorf("Dispatch a response: %w", err)
}
@@ -204,8 +206,12 @@ func (client *RPCClient) handleNewMsg(msg []byte) error {
return fmt.Errorf("Failed to unmarshal notification params: %w", err)
}
- // Send the notification to the subscription
- err := client.subscriber.disptach(ntRes.Subscription, *ntRes.Result)
+ // TODO: Consider using a more efficient design here because,
+ // on each registration, messageDispatcher will create a new subscription
+ // channel with the provided buffer size. If the buffer for the subscription
+ // channel is full, calling the dispatch method will block and in this
+ // case the client will not be able to handle any more messages.
+ err := client.subscriptions.dispatch(ntRes.Subscription, *ntRes.Result)
if err != nil {
return fmt.Errorf("Dispatch a notification: %w", err)
}
diff --git a/jsonrpc/client/message_dispatcher.go b/jsonrpc/client/message_dispatcher.go
index dab75ca..9177f6e 100644
--- a/jsonrpc/client/message_dispatcher.go
+++ b/jsonrpc/client/message_dispatcher.go
@@ -40,17 +40,18 @@ func (c *messageDispatcher[K, V]) length() int {
return len(c.chans)
}
-// disptach Disptaches the msg to the channel with the given key
-func (c *messageDispatcher[K, V]) disptach(key K, msg V) error {
+// dispatch Disptaches the msg to the channel with the given key
+func (c *messageDispatcher[K, V]) dispatch(key K, msg V) error {
c.Lock()
- defer c.Unlock()
+ ch, ok := c.chans[key]
+ c.Unlock()
- if ch, ok := c.chans[key]; ok {
- ch <- msg
- return nil
+ if !ok {
+ return fmt.Errorf("Channel not found")
}
- return fmt.Errorf("Channel not found")
+ ch <- msg
+ return nil
}
// unregister Unregisters the channel with the provided key
diff --git a/jsonrpc/client/message_dispatcher_test.go b/jsonrpc/client/message_dispatcher_test.go
index 7cc1366..a1fc1c6 100644
--- a/jsonrpc/client/message_dispatcher_test.go
+++ b/jsonrpc/client/message_dispatcher_test.go
@@ -1,9 +1,6 @@
package client
import (
- // "sync"
- // "sync/atomic"
-
"sync"
"sync/atomic"
"testing"
@@ -12,7 +9,6 @@ import (
)
func TestDispatchToChannel(t *testing.T) {
-
messageDispatcher := newMessageDispatcher[int, int](10)
chanKey := 1
@@ -26,7 +22,7 @@ func TestDispatchToChannel(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 50; i++ {
- err := messageDispatcher.disptach(chanKey, i)
+ err := messageDispatcher.dispatch(chanKey, i)
assert.Nil(t, err)
}
@@ -37,7 +33,7 @@ func TestDispatchToChannel(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 50; i++ {
- err := messageDispatcher.disptach(chanKey2, i)
+ err := messageDispatcher.dispatch(chanKey2, i)
assert.Nil(t, err)
}
@@ -79,12 +75,11 @@ func TestUnregisterChannel(t *testing.T) {
_, ok := <-rx
assert.False(t, ok, "chan closed")
- err := messageDispatcher.disptach(chanKey, 1)
+ err := messageDispatcher.dispatch(chanKey, 1)
assert.NotNil(t, err)
}
func TestClearChannels(t *testing.T) {
-
messageDispatcher := newMessageDispatcher[int, int](1)
chanKey := 1
@@ -96,6 +91,6 @@ func TestClearChannels(t *testing.T) {
_, ok := <-rx
assert.False(t, ok, "chan closed")
- err := messageDispatcher.disptach(chanKey, 1)
+ err := messageDispatcher.dispatch(chanKey, 1)
assert.NotNil(t, err)
}