aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/src/async_utils/task_group.rs73
-rw-r--r--core/src/executor.rs28
-rw-r--r--core/src/lib.rs10
-rw-r--r--net/src/lib.rs4
-rw-r--r--p2p/examples/chat.rs44
-rw-r--r--p2p/examples/monitor.rs60
-rw-r--r--p2p/examples/peer.rs43
-rw-r--r--p2p/examples/shared/mod.rs33
-rw-r--r--p2p/src/backend.rs23
-rw-r--r--p2p/src/connector.rs17
-rw-r--r--p2p/src/discovery/lookup.rs15
-rw-r--r--p2p/src/discovery/mod.rs51
-rw-r--r--p2p/src/discovery/refresh.rs38
-rw-r--r--p2p/src/error.rs2
-rw-r--r--p2p/src/listener.rs38
-rw-r--r--p2p/src/message.rs2
-rw-r--r--p2p/src/monitor.rs12
-rw-r--r--p2p/src/peer/mod.rs16
-rw-r--r--p2p/src/peer_pool.rs27
-rw-r--r--p2p/src/protocol.rs5
-rw-r--r--p2p/src/protocols/ping.rs10
-rw-r--r--p2p/src/utils/version.rs2
23 files changed, 317 insertions, 237 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index caa3ed5..ab05288 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -15,3 +15,4 @@ chrono = "0.4.30"
rand = "0.8.5"
thiserror = "1.0.47"
dirs = "5.0.1"
+async-task = "4.5.0"
diff --git a/core/src/async_utils/task_group.rs b/core/src/async_utils/task_group.rs
index 8707d0e..afc9648 100644
--- a/core/src/async_utils/task_group.rs
+++ b/core/src/async_utils/task_group.rs
@@ -1,6 +1,6 @@
use std::{future::Future, sync::Arc, sync::Mutex};
-use smol::Task;
+use async_task::FallibleTask;
use crate::Executor;
@@ -19,9 +19,9 @@ use super::{select, CondWait, Either};
/// async {
///
/// let ex = Arc::new(smol::Executor::new());
-/// let group = TaskGroup::new();
+/// let group = TaskGroup::new(ex);
///
-/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {});
+/// group.spawn(smol::Timer::never(), |_| async {});
///
/// group.cancel().await;
///
@@ -29,35 +29,38 @@ use super::{select, CondWait, Either};
///
/// ```
///
-pub struct TaskGroup {
+pub struct TaskGroup<'a> {
tasks: Mutex<Vec<TaskHandler>>,
stop_signal: Arc<CondWait>,
+ executor: Executor<'a>,
}
-impl<'a> TaskGroup {
+impl<'a> TaskGroup<'a> {
/// Creates a new task group
- pub fn new() -> Self {
+ pub fn new(executor: Executor<'a>) -> Self {
Self {
tasks: Mutex::new(Vec::new()),
stop_signal: Arc::new(CondWait::new()),
+ executor,
}
}
/// Spawns a new task and calls the callback after it has completed
/// or been canceled. The callback will have the `TaskResult` as a
/// parameter, indicating whether the task completed or was canceled.
- pub fn spawn<T, Fut, CallbackF, CallbackFut>(
- &self,
- executor: Executor<'a>,
- fut: Fut,
- callback: CallbackF,
- ) where
+ pub fn spawn<T, Fut, CallbackF, CallbackFut>(&self, fut: Fut, callback: CallbackF)
+ where
T: Send + Sync + 'a,
Fut: Future<Output = T> + Send + 'a,
CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a,
CallbackFut: Future<Output = ()> + Send + 'a,
{
- let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone());
+ let task = TaskHandler::new(
+ self.executor.clone(),
+ fut,
+ callback,
+ self.stop_signal.clone(),
+ );
self.tasks.lock().unwrap().push(task);
}
@@ -86,12 +89,6 @@ impl<'a> TaskGroup {
}
}
-impl Default for TaskGroup {
- fn default() -> Self {
- Self::new()
- }
-}
-
/// The result of a spawned task.
#[derive(Debug)]
pub enum TaskResult<T> {
@@ -110,7 +107,7 @@ impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> {
/// TaskHandler
pub struct TaskHandler {
- task: Task<()>,
+ task: FallibleTask<()>,
cancel_flag: Arc<CondWait>,
}
@@ -130,21 +127,23 @@ impl<'a> TaskHandler {
{
let cancel_flag = Arc::new(CondWait::new());
let cancel_flag_c = cancel_flag.clone();
- let task = ex.spawn(async move {
- //start_signal.signal().await;
- // Waits for either the stop signal or the task to complete.
- let result = select(stop_signal.wait(), fut).await;
+ let task = ex
+ .spawn(async move {
+ //start_signal.signal().await;
+ // Waits for either the stop signal or the task to complete.
+ let result = select(stop_signal.wait(), fut).await;
- let result = match result {
- Either::Left(_) => TaskResult::Cancelled,
- Either::Right(res) => TaskResult::Completed(res),
- };
+ let result = match result {
+ Either::Left(_) => TaskResult::Cancelled,
+ Either::Right(res) => TaskResult::Completed(res),
+ };
- // Call the callback with the result.
- callback(result).await;
+ // Call the callback with the result.
+ callback(result).await;
- cancel_flag_c.signal().await;
- });
+ cancel_flag_c.signal().await;
+ })
+ .fallible();
TaskHandler { task, cancel_flag }
}
@@ -165,22 +164,20 @@ mod tests {
fn test_task_group() {
let ex = Arc::new(smol::Executor::new());
smol::block_on(ex.clone().run(async move {
- let group = Arc::new(TaskGroup::new());
+ let group = Arc::new(TaskGroup::new(ex));
- group.spawn(ex.clone(), future::ready(0), |res| async move {
+ group.spawn(future::ready(0), |res| async move {
assert!(matches!(res, TaskResult::Completed(0)));
});
- group.spawn(ex.clone(), future::pending::<()>(), |res| async move {
+ group.spawn(future::pending::<()>(), |res| async move {
assert!(matches!(res, TaskResult::Cancelled));
});
let groupc = group.clone();
- let exc = ex.clone();
group.spawn(
- ex.clone(),
async move {
- groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move {
+ groupc.spawn(future::pending::<()>(), |res| async move {
assert!(matches!(res, TaskResult::Cancelled));
});
},
diff --git a/core/src/executor.rs b/core/src/executor.rs
new file mode 100644
index 0000000..136f6ea
--- /dev/null
+++ b/core/src/executor.rs
@@ -0,0 +1,28 @@
+
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+fn available_parallelism() -> usize {
+ thread::available_parallelism()
+ .map(NonZeroUsize::get)
+ .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) {
+ let (signal, shutdown) = channel::unbounded::<()>();
+
+ let num_threads = available_parallelism();
+
+ Parallel::new()
+ .each(0..(num_threads), |_| {
+ future::block_on(ex.run(shutdown.recv()))
+ })
+ // Run the main future on the current thread.
+ .finish(|| {
+ future::block_on(async {
+ main_future.await;
+ drop(signal);
+ })
+ });
+}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 83af888..fef7459 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -4,7 +4,7 @@ pub mod utils;
/// A module containing async utilities that work with the `smol` async runtime.
pub mod async_utils;
-/// Represents Karyons's Core Error.
+/// Represents karyons's Core Error.
pub mod error;
/// [`EventSys`](./event/struct.EventSys.html) Implementation
@@ -13,9 +13,13 @@ pub mod event;
/// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html)
pub mod pubsub;
-use error::Result;
use smol::Executor as SmolEx;
use std::sync::Arc;
-/// A wrapper for smol::Executor
+/// A pointer to an Executor
pub type Executor<'a> = Arc<SmolEx<'a>>;
+
+/// A Global Executor
+pub type GlobalExecutor = Arc<SmolEx<'static>>;
+
+use error::Result;
diff --git a/net/src/lib.rs b/net/src/lib.rs
index 914c6d8..0e4c361 100644
--- a/net/src/lib.rs
+++ b/net/src/lib.rs
@@ -17,8 +17,8 @@ pub use {
use error::{Error, Result};
-/// Represents Karyons's Net Error
+/// Represents karyons's Net Error
pub use error::Error as NetError;
-/// Represents Karyons's Net Result
+/// Represents karyons's Net Result
pub use error::Result as NetResult;
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 4358362..907ba06 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -1,9 +1,11 @@
+mod shared;
+
use std::sync::Arc;
use async_std::io;
use async_trait::async_trait;
use clap::Parser;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
@@ -12,6 +14,8 @@ use karyons_p2p::{
ArcPeer, Backend, Config, P2pError, PeerID, Version,
};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -109,33 +113,33 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let ex_cloned = ex.clone();
- let task = ex.spawn(async {
- let username = cli.username;
-
- // Attach the ChatProtocol
- let c = move |peer| ChatProtocol::new(&username, peer);
- backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+ run_executor(
+ async {
+ let username = cli.username;
- // Run the backend
- backend.run(ex_cloned).await.unwrap();
+ // Attach the ChatProtocol
+ let c = move |peer| ChatProtocol::new(&username, peer);
+ backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
- });
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- future::block_on(ex.run(task));
+ // Shutdown the backend
+ backend.shutdown().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index cd4defc..fc48c2f 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -1,13 +1,16 @@
+mod shared;
+
use std::sync::Arc;
use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
use karyons_p2p::{Backend, Config, PeerID};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -50,44 +53,39 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let (signal, shutdown) = channel::unbounded::<()>();
-
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let task = async {
- let monitor = backend.monitor().await;
+ let exc = ex.clone();
+ run_executor(
+ async {
+ let monitor = backend.monitor().await;
- let monitor_task = ex.spawn(async move {
- loop {
- let event = monitor.recv().await.unwrap();
- println!("{}", event);
- }
- });
+ let monitor_task = exc.spawn(async move {
+ loop {
+ let event = monitor.recv().await.unwrap();
+ println!("{}", event);
+ }
+ });
- // Run the backend
- backend.run(ex.clone()).await.unwrap();
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Shutdown the backend
- backend.shutdown().await;
-
- monitor_task.cancel().await;
-
- drop(signal);
- };
+ // Shutdown the backend
+ backend.shutdown().await;
- // Run four executor threads.
- Parallel::new()
- .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
- .finish(|| future::block_on(task));
+ monitor_task.cancel().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs
index f805d68..5ff365d 100644
--- a/p2p/examples/peer.rs
+++ b/p2p/examples/peer.rs
@@ -1,13 +1,16 @@
+mod shared;
+
use std::sync::Arc;
use clap::Parser;
-use easy_parallel::Parallel;
-use smol::{channel, future, Executor};
+use smol::{channel, Executor};
use karyons_net::{Endpoint, Port};
use karyons_p2p::{Backend, Config, PeerID};
+use shared::run_executor;
+
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -50,33 +53,27 @@ fn main() {
..Default::default()
};
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
// Create a new Backend
- let backend = Backend::new(peer_id, config);
+ let backend = Backend::new(peer_id, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
ctrlc::set_handler(handle).unwrap();
- let (signal, shutdown) = channel::unbounded::<()>();
-
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- let task = async {
- // Run the backend
- backend.run(ex.clone()).await.unwrap();
+ run_executor(
+ async {
+ // Run the backend
+ backend.run().await.unwrap();
- // Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
-
- // Shutdown the backend
- backend.shutdown().await;
-
- drop(signal);
- };
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
- // Run four executor threads.
- Parallel::new()
- .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
- .finish(|| future::block_on(task));
+ // Shutdown the backend
+ backend.shutdown().await;
+ },
+ ex,
+ );
}
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
new file mode 100644
index 0000000..9a8e387
--- /dev/null
+++ b/p2p/examples/shared/mod.rs
@@ -0,0 +1,33 @@
+use std::{num::NonZeroUsize, thread};
+
+use easy_parallel::Parallel;
+use smol::{channel, future, future::Future};
+
+use karyons_core::Executor;
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+fn available_parallelism() -> usize {
+ thread::available_parallelism()
+ .map(NonZeroUsize::get)
+ .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) {
+ let (signal, shutdown) = channel::unbounded::<()>();
+
+ let num_threads = available_parallelism();
+
+ Parallel::new()
+ .each(0..(num_threads), |_| {
+ future::block_on(ex.run(shutdown.recv()))
+ })
+ // Run the main future on the current thread.
+ .finish(|| {
+ future::block_on(async {
+ main_future.await;
+ drop(signal);
+ })
+ });
+}
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index bb18f06..bb0d891 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use log::info;
-use karyons_core::{pubsub::Subscription, Executor};
+use karyons_core::{pubsub::Subscription, GlobalExecutor};
use crate::{
config::Config,
@@ -34,15 +34,16 @@ pub type ArcBackend = Arc<Backend>;
/// // Create the configuration for the backend.
/// let mut config = Config::default();
///
-/// // Create a new Backend
-/// let backend = Backend::new(peer_id, config);
///
/// // Create a new Executor
/// let ex = Arc::new(Executor::new());
///
+/// // Create a new Backend
+/// let backend = Backend::new(peer_id, config, ex.clone());
+///
/// let task = async {
/// // Run the backend
-/// backend.run(ex.clone()).await.unwrap();
+/// backend.run().await.unwrap();
///
/// // ....
///
@@ -72,14 +73,14 @@ pub struct Backend {
impl Backend {
/// Creates a new Backend.
- pub fn new(id: PeerID, config: Config) -> ArcBackend {
+ pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new());
+ let cq = ConnQueue::new();
- let conn_queue = ConnQueue::new();
+ let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone());
- let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone());
- let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone());
+ let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex);
Arc::new(Self {
id: id.clone(),
@@ -91,10 +92,10 @@ impl Backend {
}
/// Run the Backend, starting the PeerPool and Discovery instances.
- pub async fn run(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn run(self: &Arc<Self>) -> Result<()> {
info!("Run the backend {}", self.id);
- self.peer_pool.start(ex.clone()).await?;
- self.discovery.start(ex.clone()).await?;
+ self.peer_pool.start().await?;
+ self.discovery.start().await?;
Ok(())
}
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index 3932c41..f41ab57 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -4,7 +4,7 @@ use log::{trace, warn};
use karyons_core::{
async_utils::{Backoff, TaskGroup, TaskResult},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{dial, Conn, Endpoint, NetError};
@@ -17,7 +17,7 @@ use crate::{
/// Responsible for creating outbound connections with other peers.
pub struct Connector {
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
/// Manages available outbound slots.
connection_slots: Arc<ConnectionSlots>,
@@ -36,9 +36,10 @@ impl Connector {
max_retries: usize,
connection_slots: Arc<ConnectionSlots>,
monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
) -> Arc<Self> {
Arc::new(Self {
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
monitor,
connection_slots,
max_retries,
@@ -92,14 +93,13 @@ impl Connector {
/// Establish a connection to the given `endpoint`. For each new connection,
/// it invokes the provided `callback`, and pass the connection to the callback.
- pub async fn connect_with_cback<'a, Fut>(
+ pub async fn connect_with_cback<Fut>(
self: &Arc<Self>,
- ex: Executor<'a>,
endpoint: &Endpoint,
- callback: impl FnOnce(Conn) -> Fut + Send + 'a,
+ callback: impl FnOnce(Conn) -> Fut + Send + 'static,
) -> Result<()>
where
- Fut: Future<Output = Result<()>> + Send + 'a,
+ Fut: Future<Output = Result<()>> + Send + 'static,
{
let conn = self.connect(endpoint).await?;
@@ -116,8 +116,7 @@ impl Connector {
selfc.connection_slots.remove().await;
};
- self.task_group
- .spawn(ex.clone(), callback(conn), on_disconnect);
+ self.task_group.spawn(callback(conn), on_disconnect);
Ok(())
}
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 94da900..52aa339 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,7 +5,7 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyons_core::{async_utils::timeout, utils::decode, Executor};
+use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor};
use karyons_net::{Conn, Endpoint};
@@ -59,15 +59,18 @@ impl LookupService {
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
) -> Self {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
+ let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
+
let connector = Connector::new(
config.lookup_connect_retries,
outbound_slots.clone(),
monitor.clone(),
+ ex,
);
let listen_endpoint = config
@@ -88,8 +91,8 @@ impl LookupService {
}
/// Start the lookup service.
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
- self.start_listener(ex).await?;
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
+ self.start_listener().await?;
Ok(())
}
@@ -233,7 +236,7 @@ impl LookupService {
}
/// Start a listener.
- async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn start_listener(self: &Arc<Self>) -> Result<()> {
let addr = match &self.listen_endpoint {
Some(a) => a.read().await.addr()?.clone(),
None => return Ok(()),
@@ -248,7 +251,7 @@ impl LookupService {
Ok(())
};
- self.listener.start(ex, endpoint.clone(), callback).await?;
+ self.listener.start(endpoint.clone(), callback).await?;
Ok(())
}
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7b8e7dc..7d37eec 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -9,7 +9,7 @@ use smol::lock::Mutex;
use karyons_core::{
async_utils::{Backoff, TaskGroup, TaskResult},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{Conn, Endpoint};
@@ -57,7 +57,7 @@ pub struct Discovery {
pub(crate) outbound_slots: Arc<ConnectionSlots>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -70,6 +70,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
@@ -77,16 +78,23 @@ impl Discovery {
let table_key = peer_id.0;
let table = Arc::new(Mutex::new(RoutingTable::new(table_key)));
- let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone());
- let lookup_service =
- LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone());
+ let refresh_service =
+ RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
+ let lookup_service = LookupService::new(
+ peer_id,
+ table.clone(),
+ config.clone(),
+ monitor.clone(),
+ ex.clone(),
+ );
let connector = Connector::new(
config.max_connect_retries,
outbound_slots.clone(),
monitor.clone(),
+ ex.clone(),
);
- let listener = Listener::new(inbound_slots.clone(), monitor.clone());
+ let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
Arc::new(Self {
refresh_service: Arc::new(refresh_service),
@@ -97,13 +105,13 @@ impl Discovery {
outbound_slots,
connector,
listener,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
config,
})
}
/// Start the Discovery
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
// Check if the listen_endpoint is provided, and if so, start a listener.
if let Some(endpoint) = &self.config.listen_endpoint {
// Return an error if the discovery port is set to 0.
@@ -113,7 +121,7 @@ impl Discovery {
));
}
- let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?;
+ let resolved_endpoint = self.start_listener(endpoint).await?;
if endpoint.addr()? != resolved_endpoint.addr()? {
info!("Resolved listen endpoint: {resolved_endpoint}");
@@ -127,19 +135,19 @@ impl Discovery {
}
// Start the lookup service
- self.lookup_service.start(ex.clone()).await?;
+ self.lookup_service.start().await?;
// Start the refresh service
- self.refresh_service.start(ex.clone()).await?;
+ self.refresh_service.start().await?;
// Attempt to manually connect to peer endpoints provided in the Config.
for endpoint in self.config.peer_endpoints.iter() {
- let _ = self.connect(endpoint, None, ex.clone()).await;
+ let _ = self.connect(endpoint, None).await;
}
// Start connect loop
let selfc = self.clone();
self.task_group
- .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move {
+ .spawn(selfc.connect_loop(), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Connect loop stopped: {err}");
}
@@ -159,18 +167,14 @@ impl Discovery {
}
/// Start a listener and on success, return the resolved endpoint.
- async fn start_listener(
- self: &Arc<Self>,
- endpoint: &Endpoint,
- ex: Executor<'_>,
- ) -> Result<Endpoint> {
+ async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
let callback = |conn: Conn| async move {
selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;
Ok(())
};
- let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?;
+ let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
Ok(resolved_endpoint)
}
@@ -180,7 +184,7 @@ impl Discovery {
///
/// This will perform a backoff to prevent getting stuck in the loop
/// if the seeding process couldn't find any peers.
- async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
let random_entry = self.random_entry(PENDING_ENTRY).await;
@@ -188,8 +192,7 @@ impl Discovery {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
- self.connect(&endpoint, Some(entry.key.into()), ex.clone())
- .await;
+ self.connect(&endpoint, Some(entry.key.into())).await;
}
None => {
backoff.sleep().await;
@@ -200,7 +203,7 @@ impl Discovery {
}
/// Connect to the given endpoint using the connector
- async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) {
+ async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
let selfc = self.clone();
let pid_cloned = pid.clone();
let cback = |conn: Conn| async move {
@@ -211,7 +214,7 @@ impl Discovery {
Ok(())
};
- let res = self.connector.connect_with_cback(ex, endpoint, cback).await;
+ let res = self.connector.connect_with_cback(endpoint, cback).await;
if let Some(pid) = &pid {
match res {
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 7582c84..a708261 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -12,7 +12,7 @@ use smol::{
use karyons_core::{
async_utils::{timeout, Backoff, TaskGroup, TaskResult},
utils::{decode, encode},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn};
@@ -43,7 +43,10 @@ pub struct RefreshService {
listen_endpoint: Option<RwLock<Endpoint>>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
+
+ /// A global executor
+ executor: GlobalExecutor,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -58,6 +61,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
+ executor: GlobalExecutor,
) -> Self {
let listen_endpoint = config
.listen_endpoint
@@ -67,41 +71,36 @@ impl RefreshService {
Self {
table,
listen_endpoint,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(executor.clone()),
+ executor,
config,
monitor,
}
}
/// Start the refresh service
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
if let Some(endpoint) = &self.listen_endpoint {
let endpoint = endpoint.read().await;
let addr = endpoint.addr()?;
let port = self.config.discovery_port;
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(addr.clone(), port),
- |res| async move {
+ self.task_group
+ .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {err}");
}
- },
- );
+ });
}
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.refresh_loop(ex.clone()),
- |res| async move {
+ self.task_group
+ .spawn(selfc.refresh_loop(), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Refresh loop stopped: {err}");
}
- },
- );
+ });
Ok(())
}
@@ -121,7 +120,7 @@ impl RefreshService {
/// Initiates periodic refreshing of the routing table. This function will
/// select 8 random entries from each bucket in the routing table and start
/// sending Ping messages to the entries.
- async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ async fn refresh_loop(self: Arc<Self>) -> Result<()> {
let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
timer.next().await;
@@ -140,13 +139,14 @@ impl RefreshService {
}
drop(table);
- self.clone().do_refresh(&entries, ex.clone()).await;
+ self.clone().do_refresh(&entries).await;
}
}
/// Iterates over the entries and spawns a new task for each entry to
/// initiate a connection attempt to that entry.
- async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) {
+ async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) {
+ let ex = &self.executor;
for chunk in entries.chunks(16) {
let mut tasks = Vec::new();
for bucket_entry in chunk {
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index 945e90a..91d2c39 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -2,7 +2,7 @@ use thiserror::Error as ThisError;
pub type Result<T> = std::result::Result<T, Error>;
-/// Represents Karyons's p2p Error.
+/// Represents karyons's p2p Error.
#[derive(ThisError, Debug)]
pub enum Error {
#[error("IO Error: {0}")]
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index ee92536..f2391f7 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -4,7 +4,7 @@ use log::{error, info, trace};
use karyons_core::{
async_utils::{TaskGroup, TaskResult},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
@@ -18,7 +18,7 @@ use crate::{
/// Responsible for creating inbound connections with other peers.
pub struct Listener {
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
/// Manages available inbound slots.
connection_slots: Arc<ConnectionSlots>,
@@ -29,10 +29,14 @@ pub struct Listener {
impl Listener {
/// Creates a new Listener
- pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> {
+ pub fn new(
+ connection_slots: Arc<ConnectionSlots>,
+ monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
+ ) -> Arc<Self> {
Arc::new(Self {
connection_slots,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
monitor,
})
}
@@ -42,15 +46,14 @@ impl Listener {
/// connection to the callback.
///
/// Returns the resloved listening endpoint.
- pub async fn start<'a, Fut>(
+ pub async fn start<Fut>(
self: &Arc<Self>,
- ex: Executor<'a>,
endpoint: Endpoint,
// https://github.com/rust-lang/rfcs/pull/2132
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
) -> Result<Endpoint>
where
- Fut: Future<Output = Result<()>> + Send + 'a,
+ Fut: Future<Output = Result<()>> + Send + 'static,
{
let listener = match listen(&endpoint).await {
Ok(listener) => {
@@ -73,15 +76,12 @@ impl Listener {
info!("Start listening on {endpoint}");
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(ex.clone(), listener, callback),
- |res| async move {
+ self.task_group
+ .spawn(selfc.listen_loop(listener, callback), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {endpoint} {err}");
}
- },
- );
+ });
Ok(resolved_endpoint)
}
@@ -90,14 +90,13 @@ impl Listener {
self.task_group.cancel().await;
}
- async fn listen_loop<'a, Fut>(
+ async fn listen_loop<Fut>(
self: Arc<Self>,
- ex: Executor<'a>,
listener: Box<dyn NetListener>,
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
) -> Result<()>
where
- Fut: Future<Output = Result<()>> + Send + 'a,
+ Fut: Future<Output = Result<()>> + Send + 'static,
{
loop {
// Wait for an available inbound slot.
@@ -134,8 +133,7 @@ impl Listener {
};
let callback = callback.clone();
- self.task_group
- .spawn(ex.clone(), callback(conn), on_disconnect);
+ self.task_group.spawn(callback(conn), on_disconnect);
}
}
}
diff --git a/p2p/src/message.rs b/p2p/src/message.rs
index cdb9837..d3691c2 100644
--- a/p2p/src/message.rs
+++ b/p2p/src/message.rs
@@ -12,7 +12,7 @@ pub const MSG_HEADER_SIZE: usize = 6;
/// The maximum allowed size for a message in bytes.
pub const MAX_ALLOWED_MSG_SIZE: u32 = 1000000;
-/// Defines the main message in the Karyon P2P network.
+/// Defines the main message in the karyon p2p network.
///
/// This message structure consists of a header and payload, where the header
/// typically contains essential information about the message, and the payload
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs
index ee0bf44..fbbf43f 100644
--- a/p2p/src/monitor.rs
+++ b/p2p/src/monitor.rs
@@ -13,11 +13,19 @@ use karyons_net::Endpoint;
/// # Example
///
/// ```
+/// use std::sync::Arc;
+///
+/// use smol::Executor;
+///
/// use karyons_p2p::{Config, Backend, PeerID};
+///
/// async {
///
-/// let backend = Backend::new(PeerID::random(), Config::default());
-///
+/// // Create a new Executor
+/// let ex = Arc::new(Executor::new());
+///
+/// let backend = Backend::new(PeerID::random(), Config::default(), ex);
+///
/// // Create a new Subscription
/// let sub = backend.monitor().await;
///
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 60e76a1..85cd558 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -14,7 +14,7 @@ use karyons_core::{
async_utils::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
utils::{decode, encode},
- Executor,
+ GlobalExecutor,
};
use karyons_net::Endpoint;
@@ -56,7 +56,7 @@ pub struct Peer {
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
}
impl Peer {
@@ -67,6 +67,7 @@ impl Peer {
io_codec: IOCodec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
+ ex: GlobalExecutor,
) -> ArcPeer {
Arc::new(Peer {
id: id.clone(),
@@ -76,14 +77,14 @@ impl Peer {
remote_endpoint,
conn_direction,
protocol_events: EventSys::new(),
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
stop_chan: channel::bounded(1),
})
}
/// Run the peer
- pub async fn run(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
- self.start_protocols(ex.clone()).await;
+ pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> {
+ self.start_protocols(ex).await;
self.read_loop().await
}
@@ -205,7 +206,7 @@ impl Peer {
}
/// Start running the protocols for this peer connection.
- async fn start_protocols(self: &Arc<Self>, ex: Executor<'_>) {
+ async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) {
for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() {
trace!("peer {} start protocol {protocol_id}", self.id);
let protocol = constructor(self.clone());
@@ -213,7 +214,6 @@ impl Peer {
self.protocol_ids.write().await.push(protocol_id.clone());
let selfc = self.clone();
- let exc = ex.clone();
let proto_idc = protocol_id.clone();
let on_failure = |result: TaskResult<Result<()>>| async move {
@@ -227,7 +227,7 @@ impl Peer {
};
self.task_group
- .spawn(ex.clone(), protocol.start(exc), on_failure);
+ .spawn(protocol.start(ex.clone()), on_failure);
}
}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index 2433cfc..0d17307 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -13,7 +13,7 @@ use smol::{
use karyons_core::{
async_utils::{TaskGroup, TaskResult},
utils::decode,
- Executor,
+ GlobalExecutor,
};
use karyons_net::Conn;
@@ -51,10 +51,13 @@ pub struct PeerPool {
protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>,
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
+
+ /// A global Executor
+ executor: GlobalExecutor,
/// The Configuration for the P2P network.
- pub config: Arc<Config>,
+ pub(crate) config: Arc<Config>,
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
@@ -67,6 +70,7 @@ impl PeerPool {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
+ executor: GlobalExecutor,
) -> Arc<Self> {
let protocols = RwLock::new(HashMap::new());
let protocol_versions = Arc::new(RwLock::new(HashMap::new()));
@@ -77,23 +81,23 @@ impl PeerPool {
peers: Mutex::new(HashMap::new()),
protocols,
protocol_versions,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(executor.clone()),
+ executor,
monitor,
config,
})
}
/// Start
- pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> {
+ pub async fn start(self: &Arc<Self>) -> Result<()> {
self.setup_protocols().await?;
let selfc = self.clone();
- self.task_group
- .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {});
+ self.task_group.spawn(selfc.listen_loop(), |_| async {});
Ok(())
}
/// Listens to a new connection from the connection queue
- pub async fn listen_loop(self: Arc<Self>, ex: Executor<'_>) {
+ pub async fn listen_loop(self: Arc<Self>) {
loop {
let new_conn = self.conn_queue.next().await;
let disconnect_signal = new_conn.disconnect_signal;
@@ -103,7 +107,6 @@ impl PeerPool {
new_conn.conn,
&new_conn.direction,
disconnect_signal.clone(),
- ex.clone(),
)
.await;
@@ -128,7 +131,7 @@ impl PeerPool {
let protocols = &mut self.protocols.write().await;
protocol_versions.insert(P::id(), P::version()?);
- protocols.insert(P::id(), Box::new(c) as Box<ProtocolConstructor>);
+ protocols.insert(P::id(), c);
Ok(())
}
@@ -153,7 +156,6 @@ impl PeerPool {
conn: Conn,
conn_direction: &ConnDirection,
disconnect_signal: Sender<()>,
- ex: Executor<'_>,
) -> Result<PeerID> {
let endpoint = conn.peer_endpoint()?;
let io_codec = IOCodec::new(conn);
@@ -173,6 +175,7 @@ impl PeerPool {
io_codec,
endpoint.clone(),
conn_direction.clone(),
+ self.executor.clone(),
);
// Insert the new peer
@@ -190,7 +193,7 @@ impl PeerPool {
};
self.task_group
- .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect);
+ .spawn(peer.run(self.executor.clone()), on_disconnect);
info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}");
diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs
index 515efc6..770b695 100644
--- a/p2p/src/protocol.rs
+++ b/p2p/src/protocol.rs
@@ -87,8 +87,11 @@ impl EventValue for ProtocolEvent {
/// let peer_id = PeerID::random();
/// let config = Config::default();
///
+/// // Create a new Executor
+/// let ex = Arc::new(Executor::new());
+///
/// // Create a new Backend
-/// let backend = Backend::new(peer_id, config);
+/// let backend = Backend::new(peer_id, config, ex);
///
/// // Attach the NewProtocol
/// let c = move |peer| NewProtocol::new(peer);
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index b337494..dc1b9a1 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -39,7 +39,6 @@ pub struct PingProtocol {
peer: ArcPeer,
ping_interval: u64,
ping_timeout: u64,
- task_group: TaskGroup,
}
impl PingProtocol {
@@ -51,7 +50,6 @@ impl PingProtocol {
peer,
ping_interval,
ping_timeout,
- task_group: TaskGroup::new(),
})
}
@@ -130,12 +128,14 @@ impl PingProtocol {
impl Protocol for PingProtocol {
async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {
trace!("Start Ping protocol");
+
+ let task_group = TaskGroup::new(ex);
+
let (pong_chan, pong_chan_recv) = channel::bounded(1);
let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1);
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
+ task_group.spawn(
selfc.clone().ping_loop(pong_chan_recv.clone()),
|res| async move {
if let TaskResult::Completed(result) = res {
@@ -148,7 +148,7 @@ impl Protocol for PingProtocol {
let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await;
listener.cancel().await;
- self.task_group.cancel().await;
+ task_group.cancel().await;
match result {
Either::Left(res) => {
diff --git a/p2p/src/utils/version.rs b/p2p/src/utils/version.rs
index 4986495..a101b28 100644
--- a/p2p/src/utils/version.rs
+++ b/p2p/src/utils/version.rs
@@ -5,7 +5,7 @@ use semver::VersionReq;
use crate::{Error, Result};
-/// Represents the network version and protocol version used in Karyons p2p.
+/// Represents the network version and protocol version used in karyons p2p.
///
/// # Example
///