aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-27 02:39:31 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-27 02:39:31 +0200
commitb8b5f00e9695f46ea30af3ce63aec6dd17f356ae (patch)
tree3f1b07539c248f9536f5c7b6e3870e235d4f49d7 /jsonrpc/src/server/mod.rs
parent1a3ef2d77ab54bfe286f7400ac0cee2e25ea14e3 (diff)
Improve async channels error handling and replace unbounded channels with bounded channels
Remove all unbounded channels to prevent unbounded memory usage and potential crashes. Use `FuturesUnordered` for sending to multiple channels simultaneously. This prevents the sending loop from blocking if one channel is blocked, and helps handle errors properly.
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
-rw-r--r--jsonrpc/src/server/mod.rs12
1 files changed, 7 insertions, 5 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 8fa8a1c..8d5cd2c 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -33,6 +33,8 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse";
pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found";
pub const UNSUPPORTED_JSONRPC_VERSION: &str = "Unsupported jsonrpc version";
+const CHANNEL_SUBSCRIPTION_BUFFER_SIZE: usize = 100;
+
struct NewRequest {
srvc_name: String,
method_name: String,
@@ -108,7 +110,7 @@ impl Server {
let conn = Arc::new(conn);
- let (ch_tx, ch_rx) = async_channel::unbounded();
+ let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_SUBSCRIPTION_BUFFER_SIZE);
// Create a new connection channel for managing subscriptions
let channel = Channel::new(ch_tx);
@@ -120,13 +122,13 @@ impl Server {
if let TaskResult::Completed(Err(err)) = result {
debug!("Notification loop stopped: {err}");
}
- // Close the connection subscription channel
+ // Close the connection channel
chan.close();
};
let conn_cloned = conn.clone();
let queue_cloned = queue.clone();
- // Start listening for responses in the queue or new notifications
+ // Start listening for new responses in the queue or new notifications
self.task_group.spawn(
async move {
loop {
@@ -163,12 +165,12 @@ impl Server {
} else {
warn!("Connection {} dropped", endpoint);
}
- // Close the subscription channel when the connection dropped
+ // Close the connection channel when the connection dropped
chan.close();
};
let selfc = self.clone();
- // Spawn a new task and wait for requests.
+ // Spawn a new task and wait for new requests.
self.task_group.spawn(
async move {
loop {