aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3 /p2p/examples
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
Diffstat (limited to 'p2p/examples')
-rw-r--r--p2p/examples/chat.rs44
-rw-r--r--p2p/examples/monitor.rs60
-rw-r--r--p2p/examples/peer.rs43
-rw-r--r--p2p/examples/shared/mod.rs33
4 files changed, 106 insertions, 74 deletions
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 4358362..907ba06 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -1,9 +1,11 @@
+mod shared;
+
use std::sync::Arc;
use async_std::io;
use async_trait::async_trait;
use clap::Parser;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
@@ -12,6 +14,8 @@ use karyons_p2p::{
ArcPeer, Backend, Config, P2pError, PeerID, Version,
};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -109,33 +113,33 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let ex_cloned = ex.clone();
- let task = ex.spawn(async {
- let username = cli.username;
-
- // Attach the ChatProtocol
- let c = move |peer| ChatProtocol::new(&username, peer);
- backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+ run_executor(
+ async {
+ let username = cli.username;
- // Run the backend
- backend.run(ex_cloned).await.unwrap();
+ // Attach the ChatProtocol
+ let c = move |peer| ChatProtocol::new(&username, peer);
+ backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
- });
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- future::block_on(ex.run(task));
+ // Shutdown the backend
+ backend.shutdown().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index cd4defc..fc48c2f 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -1,13 +1,16 @@
+mod shared;
+
use std::sync::Arc;
use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
use karyons_p2p::{Backend, Config, PeerID};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -50,44 +53,39 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let (signal, shutdown) = channel::unbounded::<()>();
-
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let task = async {
- let monitor = backend.monitor().await;
+ let exc = ex.clone();
+ run_executor(
+ async {
+ let monitor = backend.monitor().await;
- let monitor_task = ex.spawn(async move {
- loop {
- let event = monitor.recv().await.unwrap();
- println!("{}", event);
- }
- });
+ let monitor_task = exc.spawn(async move {
+ loop {
+ let event = monitor.recv().await.unwrap();
+ println!("{}", event);
+ }
+ });
- // Run the backend
- backend.run(ex.clone()).await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
-
- monitor_task.cancel().await;
-
- drop(signal);
- };
+ // Shutdown the backend
+ backend.shutdown().await;
- // Run four executor threads.
- Parallel::new()
- .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
- .finish(|| future::block_on(task));
+ monitor_task.cancel().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs
index f805d68..5ff365d 100644
--- a/p2p/examples/peer.rs
+++ b/p2p/examples/peer.rs
@@ -1,13 +1,16 @@
+mod shared;
+
use std::sync::Arc;
use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
use karyons_p2p::{Backend, Config, PeerID};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -50,33 +53,27 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let (signal, shutdown) = channel::unbounded::<()>();
-
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let task = async {
- // Run the backend
- backend.run(ex.clone()).await.unwrap();
+ run_executor(
+ async {
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
-
- // Shutdown the backend
- backend.shutdown().await;
-
- drop(signal);
- };
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Run four executor threads.
- Parallel::new()
- .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
- .finish(|| future::block_on(task));
+ // Shutdown the backend
+ backend.shutdown().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
new file mode 100644
index 0000000..9a8e387
--- /dev/null
+++ b/p2p/examples/shared/mod.rs
@@ -0,0 +1,33 @@
+use std::{num::NonZeroUsize, thread};
+
+use easy_parallel::Parallel;
+use smol::{channel, future, future::Future};
+
+use karyons_core::Executor;
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+fn available_parallelism() -> usize {
+ thread::available_parallelism()
+ .map(NonZeroUsize::get)
+ .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) {
+ let (signal, shutdown) = channel::unbounded::<()>();
+
+ let num_threads = available_parallelism();
+
+ Parallel::new()
+ .each(0..(num_threads), |_| {
+ future::block_on(ex.run(shutdown.recv()))
+ })
+ // Run the main future on the current thread.
+ .finish(|| {
+ future::block_on(async {
+ main_future.await;
+ drop(signal);
+ })
+ });
+}