diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-27 02:39:31 +0200 |
commit | b8b5f00e9695f46ea30af3ce63aec6dd17f356ae (patch) | |
tree | 3f1b07539c248f9536f5c7b6e3870e235d4f49d7 /jsonrpc | |
parent | 1a3ef2d77ab54bfe286f7400ac0cee2e25ea14e3 (diff) |
Improve async channels error handling and replace unbounded channels with bounded channels
Remove all unbounded channels to prevent unbounded memory usage and
potential crashes.
Use `FuturesUnordered` for sending to multiple channels simultaneously.
This prevents the sending loop from blocking if one channel is blocked,
and helps handle errors properly.
Diffstat (limited to 'jsonrpc')
-rw-r--r-- | jsonrpc/src/server/mod.rs | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 8fa8a1c..8d5cd2c 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -33,6 +33,8 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; pub const UNSUPPORTED_JSONRPC_VERSION: &str = "Unsupported jsonrpc version"; +const CHANNEL_SUBSCRIPTION_BUFFER_SIZE: usize = 100; + struct NewRequest { srvc_name: String, method_name: String, @@ -108,7 +110,7 @@ impl Server { let conn = Arc::new(conn); - let (ch_tx, ch_rx) = async_channel::unbounded(); + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_SUBSCRIPTION_BUFFER_SIZE); // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); @@ -120,13 +122,13 @@ impl Server { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } - // Close the connection subscription channel + // Close the connection channel chan.close(); }; let conn_cloned = conn.clone(); let queue_cloned = queue.clone(); - // Start listening for responses in the queue or new notifications + // Start listening for new responses in the queue or new notifications self.task_group.spawn( async move { loop { @@ -163,12 +165,12 @@ impl Server { } else { warn!("Connection {} dropped", endpoint); } - // Close the subscription channel when the connection dropped + // Close the connection channel when the connection dropped chan.close(); }; let selfc = self.clone(); - // Spawn a new task and wait for requests. + // Spawn a new task and wait for new requests. self.task_group.spawn( async move { loop { |