From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@proton.me>
Date: Wed, 15 Nov 2023 17:16:39 +0300
Subject: improve the TaskGroup API

the TaskGroup now holds an Executor instead of passing it when calling
its spawn method

also, define a global executor `Executor<'static>` and use static
lifetime instead of a lifetime placeholder

This improvement simplify the code for spawning a new task. There is no
need to pass the executor around.
---
 p2p/examples/chat.rs       | 44 ++++++++++++++++++----------------
 p2p/examples/monitor.rs    | 60 ++++++++++++++++++++++------------------------
 p2p/examples/peer.rs       | 43 ++++++++++++++++-----------------
 p2p/examples/shared/mod.rs | 33 +++++++++++++++++++++++++
 4 files changed, 106 insertions(+), 74 deletions(-)
 create mode 100644 p2p/examples/shared/mod.rs

(limited to 'p2p/examples')

diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 4358362..907ba06 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -1,9 +1,11 @@
+mod shared;
+
 use std::sync::Arc;
 
 use async_std::io;
 use async_trait::async_trait;
 use clap::Parser;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
 
 use karyons_net::{Endpoint, Port};
 
@@ -12,6 +14,8 @@ use karyons_p2p::{
     ArcPeer, Backend, Config, P2pError, PeerID, Version,
 };
 
+use shared::run_executor;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
@@ -109,33 +113,33 @@ fn main() {
         ..Default::default()
     };
 
+    // Create a new Executor
+    let ex = Arc::new(Executor::new());
+
     // Create a new Backend
-    let backend = Backend::new(peer_id, config);
+    let backend = Backend::new(peer_id, config, ex.clone());
 
     let (ctrlc_s, ctrlc_r) = channel::unbounded();
     let handle = move || ctrlc_s.try_send(()).unwrap();
     ctrlc::set_handler(handle).unwrap();
 
-    // Create a new Executor
-    let ex = Arc::new(Executor::new());
-
-    let ex_cloned = ex.clone();
-    let task = ex.spawn(async {
-        let username = cli.username;
-
-        // Attach the ChatProtocol
-        let c = move |peer| ChatProtocol::new(&username, peer);
-        backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+    run_executor(
+        async {
+            let username = cli.username;
 
-        // Run the backend
-        backend.run(ex_cloned).await.unwrap();
+            // Attach the ChatProtocol
+            let c = move |peer| ChatProtocol::new(&username, peer);
+            backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
 
-        // Wait for ctrlc signal
-        ctrlc_r.recv().await.unwrap();
+            // Run the backend
+            backend.run().await.unwrap();
 
-        // Shutdown the backend
-        backend.shutdown().await;
-    });
+            // Wait for ctrlc signal
+            ctrlc_r.recv().await.unwrap();
 
-    future::block_on(ex.run(task));
+            // Shutdown the backend
+            backend.shutdown().await;
+        },
+        ex,
+    );
 }
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index cd4defc..fc48c2f 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -1,13 +1,16 @@
+mod shared;
+
 use std::sync::Arc;
 
 use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
 
 use karyons_net::{Endpoint, Port};
 
 use karyons_p2p::{Backend, Config, PeerID};
 
+use shared::run_executor;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
@@ -50,44 +53,39 @@ fn main() {
         ..Default::default()
     };
 
+    // Create a new Executor
+    let ex = Arc::new(Executor::new());
+
     // Create a new Backend
-    let backend = Backend::new(peer_id, config);
+    let backend = Backend::new(peer_id, config, ex.clone());
 
     let (ctrlc_s, ctrlc_r) = channel::unbounded();
     let handle = move || ctrlc_s.try_send(()).unwrap();
     ctrlc::set_handler(handle).unwrap();
 
-    let (signal, shutdown) = channel::unbounded::<()>();
-
-    // Create a new Executor
-    let ex = Arc::new(Executor::new());
-
-    let task = async {
-        let monitor = backend.monitor().await;
+    let exc = ex.clone();
+    run_executor(
+        async {
+            let monitor = backend.monitor().await;
 
-        let monitor_task = ex.spawn(async move {
-            loop {
-                let event = monitor.recv().await.unwrap();
-                println!("{}", event);
-            }
-        });
+            let monitor_task = exc.spawn(async move {
+                loop {
+                    let event = monitor.recv().await.unwrap();
+                    println!("{}", event);
+                }
+            });
 
-        // Run the backend
-        backend.run(ex.clone()).await.unwrap();
+            // Run the backend
+            backend.run().await.unwrap();
 
-        // Wait for ctrlc signal
-        ctrlc_r.recv().await.unwrap();
+            // Wait for ctrlc signal
+            ctrlc_r.recv().await.unwrap();
 
-        // Shutdown the backend
-        backend.shutdown().await;
-
-        monitor_task.cancel().await;
-
-        drop(signal);
-    };
+            // Shutdown the backend
+            backend.shutdown().await;
 
-    // Run four executor threads.
-    Parallel::new()
-        .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
-        .finish(|| future::block_on(task));
+            monitor_task.cancel().await;
+        },
+        ex,
+    );
 }
diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs
index f805d68..5ff365d 100644
--- a/p2p/examples/peer.rs
+++ b/p2p/examples/peer.rs
@@ -1,13 +1,16 @@
+mod shared;
+
 use std::sync::Arc;
 
 use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
 
 use karyons_net::{Endpoint, Port};
 
 use karyons_p2p::{Backend, Config, PeerID};
 
+use shared::run_executor;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
@@ -50,33 +53,27 @@ fn main() {
         ..Default::default()
     };
 
+    // Create a new Executor
+    let ex = Arc::new(Executor::new());
+
     // Create a new Backend
-    let backend = Backend::new(peer_id, config);
+    let backend = Backend::new(peer_id, config, ex.clone());
 
     let (ctrlc_s, ctrlc_r) = channel::unbounded();
     let handle = move || ctrlc_s.try_send(()).unwrap();
     ctrlc::set_handler(handle).unwrap();
 
-    let (signal, shutdown) = channel::unbounded::<()>();
-
-    // Create a new Executor
-    let ex = Arc::new(Executor::new());
-
-    let task = async {
-        // Run the backend
-        backend.run(ex.clone()).await.unwrap();
+    run_executor(
+        async {
+            // Run the backend
+            backend.run().await.unwrap();
 
-        // Wait for ctrlc signal
-        ctrlc_r.recv().await.unwrap();
-
-        // Shutdown the backend
-        backend.shutdown().await;
-
-        drop(signal);
-    };
+            // Wait for ctrlc signal
+            ctrlc_r.recv().await.unwrap();
 
-    // Run four executor threads.
-    Parallel::new()
-        .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
-        .finish(|| future::block_on(task));
+            // Shutdown the backend
+            backend.shutdown().await;
+        },
+        ex,
+    );
 }
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
new file mode 100644
index 0000000..9a8e387
--- /dev/null
+++ b/p2p/examples/shared/mod.rs
@@ -0,0 +1,33 @@
+use std::{num::NonZeroUsize, thread};
+
+use easy_parallel::Parallel;
+use smol::{channel, future, future::Future};
+
+use karyons_core::Executor;
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+fn available_parallelism() -> usize {
+    thread::available_parallelism()
+        .map(NonZeroUsize::get)
+        .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) {
+    let (signal, shutdown) = channel::unbounded::<()>();
+
+    let num_threads = available_parallelism();
+
+    Parallel::new()
+        .each(0..(num_threads), |_| {
+            future::block_on(ex.run(shutdown.recv()))
+        })
+        // Run the main future on the current thread.
+        .finish(|| {
+            future::block_on(async {
+                main_future.await;
+                drop(signal);
+            })
+        });
+}
-- 
cgit v1.2.3