From b8b5f00e9695f46ea30af3ce63aec6dd17f356ae Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 27 Jun 2024 02:39:31 +0200 Subject: Improve async channels error handling and replace unbounded channels with bounded channels Remove all unbounded channels to prevent unbounded memory usage and potential crashes. Use `FuturesUnordered` for sending to multiple channels simultaneously. This prevents the sending loop from blocking if one channel is blocked, and helps handle errors properly. --- jsonrpc/src/server/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'jsonrpc/src') diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index 8fa8a1c..8d5cd2c 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -33,6 +33,8 @@ pub const FAILED_TO_PARSE_ERROR_MSG: &str = "Failed to parse"; pub const METHOD_NOT_FOUND_ERROR_MSG: &str = "Method not found"; pub const UNSUPPORTED_JSONRPC_VERSION: &str = "Unsupported jsonrpc version"; +const CHANNEL_SUBSCRIPTION_BUFFER_SIZE: usize = 100; + struct NewRequest { srvc_name: String, method_name: String, @@ -108,7 +110,7 @@ impl Server { let conn = Arc::new(conn); - let (ch_tx, ch_rx) = async_channel::unbounded(); + let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_SUBSCRIPTION_BUFFER_SIZE); // Create a new connection channel for managing subscriptions let channel = Channel::new(ch_tx); @@ -120,13 +122,13 @@ impl Server { if let TaskResult::Completed(Err(err)) = result { debug!("Notification loop stopped: {err}"); } - // Close the connection subscription channel + // Close the connection channel chan.close(); }; let conn_cloned = conn.clone(); let queue_cloned = queue.clone(); - // Start listening for responses in the queue or new notifications + // Start listening for new responses in the queue or new notifications self.task_group.spawn( async move { loop { @@ -163,12 +165,12 @@ impl Server { } else { warn!("Connection {} dropped", endpoint); } - // Close the subscription channel when the connection dropped + // Close the connection channel when the connection dropped chan.close(); }; let selfc = self.clone(); - // Spawn a new task and wait for requests. + // Spawn a new task and wait for new requests. self.task_group.spawn( async move { loop { -- cgit v1.2.3