From 78884caca030104557ca277dd3a41cefb70f5be8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 15 Nov 2023 17:16:39 +0300 Subject: improve the TaskGroup API the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around. --- p2p/examples/chat.rs | 44 ++++++++++++++++++---------------- p2p/examples/monitor.rs | 60 ++++++++++++++++++++++------------------------ p2p/examples/peer.rs | 43 ++++++++++++++++----------------- p2p/examples/shared/mod.rs | 33 +++++++++++++++++++++++++ 4 files changed, 106 insertions(+), 74 deletions(-) create mode 100644 p2p/examples/shared/mod.rs (limited to 'p2p/examples') diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 4358362..907ba06 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -1,9 +1,11 @@ +mod shared; + use std::sync::Arc; use async_std::io; use async_trait::async_trait; use clap::Parser; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; @@ -12,6 +14,8 @@ use karyons_p2p::{ ArcPeer, Backend, Config, P2pError, PeerID, Version, }; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -109,33 +113,33 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let ex_cloned = ex.clone(); - let task = ex.spawn(async { - let username = cli.username; - - // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer); - backend.attach_protocol::(c).await.unwrap(); + run_executor( + async { + let username = cli.username; - // Run the backend - backend.run(ex_cloned).await.unwrap(); + // Attach the ChatProtocol + let c = move |peer| ChatProtocol::new(&username, peer); + backend.attach_protocol::(c).await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; - }); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - future::block_on(ex.run(task)); + // Shutdown the backend + backend.shutdown().await; + }, + ex, + ); } diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index cd4defc..fc48c2f 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,13 +1,16 @@ +mod shared; + use std::sync::Arc; use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -50,44 +53,39 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let task = async { - let monitor = backend.monitor().await; + let exc = ex.clone(); + run_executor( + async { + let monitor = backend.monitor().await; - let monitor_task = ex.spawn(async move { - loop { - let event = monitor.recv().await.unwrap(); - println!("{}", event); - } - }); + let monitor_task = exc.spawn(async move { + loop { + let event = monitor.recv().await.unwrap(); + println!("{}", event); + } + }); - // Run the backend - backend.run(ex.clone()).await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; - - monitor_task.cancel().await; - - drop(signal); - }; + // Shutdown the backend + backend.shutdown().await; - // Run four executor threads. - Parallel::new() - .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| future::block_on(task)); + monitor_task.cancel().await; + }, + ex, + ); } diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index f805d68..5ff365d 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -1,13 +1,16 @@ +mod shared; + use std::sync::Arc; use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -50,33 +53,27 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let task = async { - // Run the backend - backend.run(ex.clone()).await.unwrap(); + run_executor( + async { + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); - - // Shutdown the backend - backend.shutdown().await; - - drop(signal); - }; + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Run four executor threads. - Parallel::new() - .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| future::block_on(task)); + // Shutdown the backend + backend.shutdown().await; + }, + ex, + ); } diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs new file mode 100644 index 0000000..9a8e387 --- /dev/null +++ b/p2p/examples/shared/mod.rs @@ -0,0 +1,33 @@ +use std::{num::NonZeroUsize, thread}; + +use easy_parallel::Parallel; +use smol::{channel, future, future::Future}; + +use karyons_core::Executor; + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +fn available_parallelism() -> usize { + thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor(main_future: impl Future, ex: Executor<'_>) { + let (signal, shutdown) = channel::unbounded::<()>(); + + let num_threads = available_parallelism(); + + Parallel::new() + .each(0..(num_threads), |_| { + future::block_on(ex.run(shutdown.recv())) + }) + // Run the main future on the current thread. + .finish(|| { + future::block_on(async { + main_future.await; + drop(signal); + }) + }); +} -- cgit v1.2.3