diff options
author | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-15 17:16:39 +0300 |
commit | 78884caca030104557ca277dd3a41cefb70f5be8 (patch) | |
tree | c33650dfe44a219e395dff1966d298b58b09acb3 | |
parent | f0729022589ee8e48b5558ab30462f95d06fe6df (diff) |
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling
its spawn method
also, define a global executor `Executor<'static>` and use static
lifetime instead of a lifetime placeholder
This improvement simplify the code for spawning a new task. There is no
need to pass the executor around.
-rw-r--r-- | core/Cargo.toml | 1 | ||||
-rw-r--r-- | core/src/async_utils/task_group.rs | 73 | ||||
-rw-r--r-- | core/src/executor.rs | 28 | ||||
-rw-r--r-- | core/src/lib.rs | 10 | ||||
-rw-r--r-- | net/src/lib.rs | 4 | ||||
-rw-r--r-- | p2p/examples/chat.rs | 44 | ||||
-rw-r--r-- | p2p/examples/monitor.rs | 60 | ||||
-rw-r--r-- | p2p/examples/peer.rs | 43 | ||||
-rw-r--r-- | p2p/examples/shared/mod.rs | 33 | ||||
-rw-r--r-- | p2p/src/backend.rs | 23 | ||||
-rw-r--r-- | p2p/src/connector.rs | 17 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 15 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 51 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 38 | ||||
-rw-r--r-- | p2p/src/error.rs | 2 | ||||
-rw-r--r-- | p2p/src/listener.rs | 38 | ||||
-rw-r--r-- | p2p/src/message.rs | 2 | ||||
-rw-r--r-- | p2p/src/monitor.rs | 12 | ||||
-rw-r--r-- | p2p/src/peer/mod.rs | 16 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 27 | ||||
-rw-r--r-- | p2p/src/protocol.rs | 5 | ||||
-rw-r--r-- | p2p/src/protocols/ping.rs | 10 | ||||
-rw-r--r-- | p2p/src/utils/version.rs | 2 |
23 files changed, 317 insertions, 237 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index caa3ed5..ab05288 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,3 +15,4 @@ chrono = "0.4.30" rand = "0.8.5" thiserror = "1.0.47" dirs = "5.0.1" +async-task = "4.5.0" diff --git a/core/src/async_utils/task_group.rs b/core/src/async_utils/task_group.rs index 8707d0e..afc9648 100644 --- a/core/src/async_utils/task_group.rs +++ b/core/src/async_utils/task_group.rs @@ -1,6 +1,6 @@ use std::{future::Future, sync::Arc, sync::Mutex}; -use smol::Task; +use async_task::FallibleTask; use crate::Executor; @@ -19,9 +19,9 @@ use super::{select, CondWait, Either}; /// async { /// /// let ex = Arc::new(smol::Executor::new()); -/// let group = TaskGroup::new(); +/// let group = TaskGroup::new(ex); /// -/// group.spawn(ex.clone(), smol::Timer::never(), |_| async {}); +/// group.spawn(smol::Timer::never(), |_| async {}); /// /// group.cancel().await; /// @@ -29,35 +29,38 @@ use super::{select, CondWait, Either}; /// /// ``` /// -pub struct TaskGroup { +pub struct TaskGroup<'a> { tasks: Mutex<Vec<TaskHandler>>, stop_signal: Arc<CondWait>, + executor: Executor<'a>, } -impl<'a> TaskGroup { +impl<'a> TaskGroup<'a> { /// Creates a new task group - pub fn new() -> Self { + pub fn new(executor: Executor<'a>) -> Self { Self { tasks: Mutex::new(Vec::new()), stop_signal: Arc::new(CondWait::new()), + executor, } } /// Spawns a new task and calls the callback after it has completed /// or been canceled. The callback will have the `TaskResult` as a /// parameter, indicating whether the task completed or was canceled. - pub fn spawn<T, Fut, CallbackF, CallbackFut>( - &self, - executor: Executor<'a>, - fut: Fut, - callback: CallbackF, - ) where + pub fn spawn<T, Fut, CallbackF, CallbackFut>(&self, fut: Fut, callback: CallbackF) + where T: Send + Sync + 'a, Fut: Future<Output = T> + Send + 'a, CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, CallbackFut: Future<Output = ()> + Send + 'a, { - let task = TaskHandler::new(executor.clone(), fut, callback, self.stop_signal.clone()); + let task = TaskHandler::new( + self.executor.clone(), + fut, + callback, + self.stop_signal.clone(), + ); self.tasks.lock().unwrap().push(task); } @@ -86,12 +89,6 @@ impl<'a> TaskGroup { } } -impl Default for TaskGroup { - fn default() -> Self { - Self::new() - } -} - /// The result of a spawned task. #[derive(Debug)] pub enum TaskResult<T> { @@ -110,7 +107,7 @@ impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> { /// TaskHandler pub struct TaskHandler { - task: Task<()>, + task: FallibleTask<()>, cancel_flag: Arc<CondWait>, } @@ -130,21 +127,23 @@ impl<'a> TaskHandler { { let cancel_flag = Arc::new(CondWait::new()); let cancel_flag_c = cancel_flag.clone(); - let task = ex.spawn(async move { - //start_signal.signal().await; - // Waits for either the stop signal or the task to complete. - let result = select(stop_signal.wait(), fut).await; + let task = ex + .spawn(async move { + //start_signal.signal().await; + // Waits for either the stop signal or the task to complete. + let result = select(stop_signal.wait(), fut).await; - let result = match result { - Either::Left(_) => TaskResult::Cancelled, - Either::Right(res) => TaskResult::Completed(res), - }; + let result = match result { + Either::Left(_) => TaskResult::Cancelled, + Either::Right(res) => TaskResult::Completed(res), + }; - // Call the callback with the result. - callback(result).await; + // Call the callback with the result. + callback(result).await; - cancel_flag_c.signal().await; - }); + cancel_flag_c.signal().await; + }) + .fallible(); TaskHandler { task, cancel_flag } } @@ -165,22 +164,20 @@ mod tests { fn test_task_group() { let ex = Arc::new(smol::Executor::new()); smol::block_on(ex.clone().run(async move { - let group = Arc::new(TaskGroup::new()); + let group = Arc::new(TaskGroup::new(ex)); - group.spawn(ex.clone(), future::ready(0), |res| async move { + group.spawn(future::ready(0), |res| async move { assert!(matches!(res, TaskResult::Completed(0))); }); - group.spawn(ex.clone(), future::pending::<()>(), |res| async move { + group.spawn(future::pending::<()>(), |res| async move { assert!(matches!(res, TaskResult::Cancelled)); }); let groupc = group.clone(); - let exc = ex.clone(); group.spawn( - ex.clone(), async move { - groupc.spawn(exc.clone(), future::pending::<()>(), |res| async move { + groupc.spawn(future::pending::<()>(), |res| async move { assert!(matches!(res, TaskResult::Cancelled)); }); }, diff --git a/core/src/executor.rs b/core/src/executor.rs new file mode 100644 index 0000000..136f6ea --- /dev/null +++ b/core/src/executor.rs @@ -0,0 +1,28 @@ + + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +fn available_parallelism() -> usize { + thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) { + let (signal, shutdown) = channel::unbounded::<()>(); + + let num_threads = available_parallelism(); + + Parallel::new() + .each(0..(num_threads), |_| { + future::block_on(ex.run(shutdown.recv())) + }) + // Run the main future on the current thread. + .finish(|| { + future::block_on(async { + main_future.await; + drop(signal); + }) + }); +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 83af888..fef7459 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -4,7 +4,7 @@ pub mod utils; /// A module containing async utilities that work with the `smol` async runtime. pub mod async_utils; -/// Represents Karyons's Core Error. +/// Represents karyons's Core Error. pub mod error; /// [`EventSys`](./event/struct.EventSys.html) Implementation @@ -13,9 +13,13 @@ pub mod event; /// A simple publish-subscribe system.[`Read More`](./pubsub/struct.Publisher.html) pub mod pubsub; -use error::Result; use smol::Executor as SmolEx; use std::sync::Arc; -/// A wrapper for smol::Executor +/// A pointer to an Executor pub type Executor<'a> = Arc<SmolEx<'a>>; + +/// A Global Executor +pub type GlobalExecutor = Arc<SmolEx<'static>>; + +use error::Result; diff --git a/net/src/lib.rs b/net/src/lib.rs index 914c6d8..0e4c361 100644 --- a/net/src/lib.rs +++ b/net/src/lib.rs @@ -17,8 +17,8 @@ pub use { use error::{Error, Result}; -/// Represents Karyons's Net Error +/// Represents karyons's Net Error pub use error::Error as NetError; -/// Represents Karyons's Net Result +/// Represents karyons's Net Result pub use error::Result as NetResult; diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 4358362..907ba06 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -1,9 +1,11 @@ +mod shared; + use std::sync::Arc; use async_std::io; use async_trait::async_trait; use clap::Parser; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; @@ -12,6 +14,8 @@ use karyons_p2p::{ ArcPeer, Backend, Config, P2pError, PeerID, Version, }; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -109,33 +113,33 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let ex_cloned = ex.clone(); - let task = ex.spawn(async { - let username = cli.username; - - // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer); - backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); + run_executor( + async { + let username = cli.username; - // Run the backend - backend.run(ex_cloned).await.unwrap(); + // Attach the ChatProtocol + let c = move |peer| ChatProtocol::new(&username, peer); + backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; - }); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - future::block_on(ex.run(task)); + // Shutdown the backend + backend.shutdown().await; + }, + ex, + ); } diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index cd4defc..fc48c2f 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,13 +1,16 @@ +mod shared; + use std::sync::Arc; use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -50,44 +53,39 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let task = async { - let monitor = backend.monitor().await; + let exc = ex.clone(); + run_executor( + async { + let monitor = backend.monitor().await; - let monitor_task = ex.spawn(async move { - loop { - let event = monitor.recv().await.unwrap(); - println!("{}", event); - } - }); + let monitor_task = exc.spawn(async move { + loop { + let event = monitor.recv().await.unwrap(); + println!("{}", event); + } + }); - // Run the backend - backend.run(ex.clone()).await.unwrap(); + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Shutdown the backend - backend.shutdown().await; - - monitor_task.cancel().await; - - drop(signal); - }; + // Shutdown the backend + backend.shutdown().await; - // Run four executor threads. - Parallel::new() - .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| future::block_on(task)); + monitor_task.cancel().await; + }, + ex, + ); } diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index f805d68..5ff365d 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -1,13 +1,16 @@ +mod shared; + use std::sync::Arc; use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor}; use karyons_net::{Endpoint, Port}; use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -50,33 +53,27 @@ fn main() { ..Default::default() }; + // Create a new Executor + let ex = Arc::new(Executor::new()); + // Create a new Backend - let backend = Backend::new(peer_id, config); + let backend = Backend::new(peer_id, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); ctrlc::set_handler(handle).unwrap(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Create a new Executor - let ex = Arc::new(Executor::new()); - - let task = async { - // Run the backend - backend.run(ex.clone()).await.unwrap(); + run_executor( + async { + // Run the backend + backend.run().await.unwrap(); - // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); - - // Shutdown the backend - backend.shutdown().await; - - drop(signal); - }; + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); - // Run four executor threads. - Parallel::new() - .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| future::block_on(task)); + // Shutdown the backend + backend.shutdown().await; + }, + ex, + ); } diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs new file mode 100644 index 0000000..9a8e387 --- /dev/null +++ b/p2p/examples/shared/mod.rs @@ -0,0 +1,33 @@ +use std::{num::NonZeroUsize, thread}; + +use easy_parallel::Parallel; +use smol::{channel, future, future::Future}; + +use karyons_core::Executor; + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +fn available_parallelism() -> usize { + thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) { + let (signal, shutdown) = channel::unbounded::<()>(); + + let num_threads = available_parallelism(); + + Parallel::new() + .each(0..(num_threads), |_| { + future::block_on(ex.run(shutdown.recv())) + }) + // Run the main future on the current thread. + .finish(|| { + future::block_on(async { + main_future.await; + drop(signal); + }) + }); +} diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index bb18f06..bb0d891 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use log::info; -use karyons_core::{pubsub::Subscription, Executor}; +use karyons_core::{pubsub::Subscription, GlobalExecutor}; use crate::{ config::Config, @@ -34,15 +34,16 @@ pub type ArcBackend = Arc<Backend>; /// // Create the configuration for the backend. /// let mut config = Config::default(); /// -/// // Create a new Backend -/// let backend = Backend::new(peer_id, config); /// /// // Create a new Executor /// let ex = Arc::new(Executor::new()); /// +/// // Create a new Backend +/// let backend = Backend::new(peer_id, config, ex.clone()); +/// /// let task = async { /// // Run the backend -/// backend.run(ex.clone()).await.unwrap(); +/// backend.run().await.unwrap(); /// /// // .... /// @@ -72,14 +73,14 @@ pub struct Backend { impl Backend { /// Creates a new Backend. - pub fn new(id: PeerID, config: Config) -> ArcBackend { + pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend { let config = Arc::new(config); let monitor = Arc::new(Monitor::new()); + let cq = ConnQueue::new(); - let conn_queue = ConnQueue::new(); + let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone()); - let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone()); - let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone()); + let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex); Arc::new(Self { id: id.clone(), @@ -91,10 +92,10 @@ impl Backend { } /// Run the Backend, starting the PeerPool and Discovery instances. - pub async fn run(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn run(self: &Arc<Self>) -> Result<()> { info!("Run the backend {}", self.id); - self.peer_pool.start(ex.clone()).await?; - self.discovery.start(ex.clone()).await?; + self.peer_pool.start().await?; + self.discovery.start().await?; Ok(()) } diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index 3932c41..f41ab57 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -4,7 +4,7 @@ use log::{trace, warn}; use karyons_core::{ async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{dial, Conn, Endpoint, NetError}; @@ -17,7 +17,7 @@ use crate::{ /// Responsible for creating outbound connections with other peers. pub struct Connector { /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Manages available outbound slots. connection_slots: Arc<ConnectionSlots>, @@ -36,9 +36,10 @@ impl Connector { max_retries: usize, connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>, + ex: GlobalExecutor, ) -> Arc<Self> { Arc::new(Self { - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), monitor, connection_slots, max_retries, @@ -92,14 +93,13 @@ impl Connector { /// Establish a connection to the given `endpoint`. For each new connection, /// it invokes the provided `callback`, and pass the connection to the callback. - pub async fn connect_with_cback<'a, Fut>( + pub async fn connect_with_cback<Fut>( self: &Arc<Self>, - ex: Executor<'a>, endpoint: &Endpoint, - callback: impl FnOnce(Conn) -> Fut + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Send + 'static, ) -> Result<()> where - Fut: Future<Output = Result<()>> + Send + 'a, + Fut: Future<Output = Result<()>> + Send + 'static, { let conn = self.connect(endpoint).await?; @@ -116,8 +116,7 @@ impl Connector { selfc.connection_slots.remove().await; }; - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); + self.task_group.spawn(callback(conn), on_disconnect); Ok(()) } diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 94da900..52aa339 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,7 +5,7 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use smol::lock::{Mutex, RwLock}; -use karyons_core::{async_utils::timeout, utils::decode, Executor}; +use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor}; use karyons_net::{Conn, Endpoint}; @@ -59,15 +59,18 @@ impl LookupService { table: Arc<Mutex<RoutingTable>>, config: Arc<Config>, monitor: Arc<Monitor>, + ex: GlobalExecutor, ) -> Self { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); + let connector = Connector::new( config.lookup_connect_retries, outbound_slots.clone(), monitor.clone(), + ex, ); let listen_endpoint = config @@ -88,8 +91,8 @@ impl LookupService { } /// Start the lookup service. - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { - self.start_listener(ex).await?; + pub async fn start(self: &Arc<Self>) -> Result<()> { + self.start_listener().await?; Ok(()) } @@ -233,7 +236,7 @@ impl LookupService { } /// Start a listener. - async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn start_listener(self: &Arc<Self>) -> Result<()> { let addr = match &self.listen_endpoint { Some(a) => a.read().await.addr()?.clone(), None => return Ok(()), @@ -248,7 +251,7 @@ impl LookupService { Ok(()) }; - self.listener.start(ex, endpoint.clone(), callback).await?; + self.listener.start(endpoint.clone(), callback).await?; Ok(()) } diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7b8e7dc..7d37eec 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -9,7 +9,7 @@ use smol::lock::Mutex; use karyons_core::{ async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{Conn, Endpoint}; @@ -57,7 +57,7 @@ pub struct Discovery { pub(crate) outbound_slots: Arc<ConnectionSlots>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -70,6 +70,7 @@ impl Discovery { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, + ex: GlobalExecutor, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -77,16 +78,23 @@ impl Discovery { let table_key = peer_id.0; let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); - let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); - let lookup_service = - LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); + let refresh_service = + RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); + let lookup_service = LookupService::new( + peer_id, + table.clone(), + config.clone(), + monitor.clone(), + ex.clone(), + ); let connector = Connector::new( config.max_connect_retries, outbound_slots.clone(), monitor.clone(), + ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -97,13 +105,13 @@ impl Discovery { outbound_slots, connector, listener, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), config, }) } /// Start the Discovery - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc<Self>) -> Result<()> { // Check if the listen_endpoint is provided, and if so, start a listener. if let Some(endpoint) = &self.config.listen_endpoint { // Return an error if the discovery port is set to 0. @@ -113,7 +121,7 @@ impl Discovery { )); } - let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; + let resolved_endpoint = self.start_listener(endpoint).await?; if endpoint.addr()? != resolved_endpoint.addr()? { info!("Resolved listen endpoint: {resolved_endpoint}"); @@ -127,19 +135,19 @@ impl Discovery { } // Start the lookup service - self.lookup_service.start(ex.clone()).await?; + self.lookup_service.start().await?; // Start the refresh service - self.refresh_service.start(ex.clone()).await?; + self.refresh_service.start().await?; // Attempt to manually connect to peer endpoints provided in the Config. for endpoint in self.config.peer_endpoints.iter() { - let _ = self.connect(endpoint, None, ex.clone()).await; + let _ = self.connect(endpoint, None).await; } // Start connect loop let selfc = self.clone(); self.task_group - .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { + .spawn(selfc.connect_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Connect loop stopped: {err}"); } @@ -159,18 +167,14 @@ impl Discovery { } /// Start a listener and on success, return the resolved endpoint. - async fn start_listener( - self: &Arc<Self>, - endpoint: &Endpoint, - ex: Executor<'_>, - ) -> Result<Endpoint> { + async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); let callback = |conn: Conn| async move { selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; Ok(()) }; - let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; + let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?; Ok(resolved_endpoint) } @@ -180,7 +184,7 @@ impl Discovery { /// /// This will perform a backoff to prevent getting stuck in the loop /// if the seeding process couldn't find any peers. - async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn connect_loop(self: Arc<Self>) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { let random_entry = self.random_entry(PENDING_ENTRY).await; @@ -188,8 +192,7 @@ impl Discovery { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); - self.connect(&endpoint, Some(entry.key.into()), ex.clone()) - .await; + self.connect(&endpoint, Some(entry.key.into())).await; } None => { backoff.sleep().await; @@ -200,7 +203,7 @@ impl Discovery { } /// Connect to the given endpoint using the connector - async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) { + async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) { let selfc = self.clone(); let pid_cloned = pid.clone(); let cback = |conn: Conn| async move { @@ -211,7 +214,7 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(ex, endpoint, cback).await; + let res = self.connector.connect_with_cback(endpoint, cback).await; if let Some(pid) = &pid { match res { diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 7582c84..a708261 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -12,7 +12,7 @@ use smol::{ use karyons_core::{ async_utils::{timeout, Backoff, TaskGroup, TaskResult}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; @@ -43,7 +43,10 @@ pub struct RefreshService { listen_endpoint: Option<RwLock<Endpoint>>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global executor + executor: GlobalExecutor, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -58,6 +61,7 @@ impl RefreshService { config: Arc<Config>, table: Arc<Mutex<RoutingTable>>, monitor: Arc<Monitor>, + executor: GlobalExecutor, ) -> Self { let listen_endpoint = config .listen_endpoint @@ -67,41 +71,36 @@ impl RefreshService { Self { table, listen_endpoint, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, config, monitor, } } /// Start the refresh service - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc<Self>) -> Result<()> { if let Some(endpoint) = &self.listen_endpoint { let endpoint = endpoint.read().await; let addr = endpoint.addr()?; let port = self.config.discovery_port; let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(addr.clone(), port), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(addr.clone(), port), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {err}"); } - }, - ); + }); } let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.refresh_loop(ex.clone()), - |res| async move { + self.task_group + .spawn(selfc.refresh_loop(), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Refresh loop stopped: {err}"); } - }, - ); + }); Ok(()) } @@ -121,7 +120,7 @@ impl RefreshService { /// Initiates periodic refreshing of the routing table. This function will /// select 8 random entries from each bucket in the routing table and start /// sending Ping messages to the entries. - async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { + async fn refresh_loop(self: Arc<Self>) -> Result<()> { let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { timer.next().await; @@ -140,13 +139,14 @@ impl RefreshService { } drop(table); - self.clone().do_refresh(&entries, ex.clone()).await; + self.clone().do_refresh(&entries).await; } } /// Iterates over the entries and spawns a new task for each entry to /// initiate a connection attempt to that entry. - async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) { + async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) { + let ex = &self.executor; for chunk in entries.chunks(16) { let mut tasks = Vec::new(); for bucket_entry in chunk { diff --git a/p2p/src/error.rs b/p2p/src/error.rs index 945e90a..91d2c39 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -2,7 +2,7 @@ use thiserror::Error as ThisError; pub type Result<T> = std::result::Result<T, Error>; -/// Represents Karyons's p2p Error. +/// Represents karyons's p2p Error. #[derive(ThisError, Debug)] pub enum Error { #[error("IO Error: {0}")] diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index ee92536..f2391f7 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -4,7 +4,7 @@ use log::{error, info, trace}; use karyons_core::{ async_utils::{TaskGroup, TaskResult}, - Executor, + GlobalExecutor, }; use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; @@ -18,7 +18,7 @@ use crate::{ /// Responsible for creating inbound connections with other peers. pub struct Listener { /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc<ConnectionSlots>, @@ -29,10 +29,14 @@ pub struct Listener { impl Listener { /// Creates a new Listener - pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> { + pub fn new( + connection_slots: Arc<ConnectionSlots>, + monitor: Arc<Monitor>, + ex: GlobalExecutor, + ) -> Arc<Self> { Arc::new(Self { connection_slots, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), monitor, }) } @@ -42,15 +46,14 @@ impl Listener { /// connection to the callback. /// /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( + pub async fn start<Fut>( self: &Arc<Self>, - ex: Executor<'a>, endpoint: Endpoint, // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result<Endpoint> where - Fut: Future<Output = Result<()>> + Send + 'a, + Fut: Future<Output = Result<()>> + Send + 'static, { let listener = match listen(&endpoint).await { Ok(listener) => { @@ -73,15 +76,12 @@ impl Listener { info!("Start listening on {endpoint}"); let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { + self.task_group + .spawn(selfc.listen_loop(listener, callback), |res| async move { if let TaskResult::Completed(Err(err)) = res { error!("Listen loop stopped: {endpoint} {err}"); } - }, - ); + }); Ok(resolved_endpoint) } @@ -90,14 +90,13 @@ impl Listener { self.task_group.cancel().await; } - async fn listen_loop<'a, Fut>( + async fn listen_loop<Fut>( self: Arc<Self>, - ex: Executor<'a>, listener: Box<dyn NetListener>, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, + callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, ) -> Result<()> where - Fut: Future<Output = Result<()>> + Send + 'a, + Fut: Future<Output = Result<()>> + Send + 'static, { loop { // Wait for an available inbound slot. @@ -134,8 +133,7 @@ impl Listener { }; let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); + self.task_group.spawn(callback(conn), on_disconnect); } } } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index cdb9837..d3691c2 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -12,7 +12,7 @@ pub const MSG_HEADER_SIZE: usize = 6; /// The maximum allowed size for a message in bytes. pub const MAX_ALLOWED_MSG_SIZE: u32 = 1000000; -/// Defines the main message in the Karyon P2P network. +/// Defines the main message in the karyon p2p network. /// /// This message structure consists of a header and payload, where the header /// typically contains essential information about the message, and the payload diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index ee0bf44..fbbf43f 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -13,11 +13,19 @@ use karyons_net::Endpoint; /// # Example /// /// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +/// /// use karyons_p2p::{Config, Backend, PeerID}; +/// /// async { /// -/// let backend = Backend::new(PeerID::random(), Config::default()); -/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let backend = Backend::new(PeerID::random(), Config::default(), ex); +/// /// // Create a new Subscription /// let sub = backend.monitor().await; /// diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 60e76a1..85cd558 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -14,7 +14,7 @@ use karyons_core::{ async_utils::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, utils::{decode, encode}, - Executor, + GlobalExecutor, }; use karyons_net::Endpoint; @@ -56,7 +56,7 @@ pub struct Peer { stop_chan: (Sender<Result<()>>, Receiver<Result<()>>), /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, } impl Peer { @@ -67,6 +67,7 @@ impl Peer { io_codec: IOCodec, remote_endpoint: Endpoint, conn_direction: ConnDirection, + ex: GlobalExecutor, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), @@ -76,14 +77,14 @@ impl Peer { remote_endpoint, conn_direction, protocol_events: EventSys::new(), - task_group: TaskGroup::new(), + task_group: TaskGroup::new(ex), stop_chan: channel::bounded(1), }) } /// Run the peer - pub async fn run(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { - self.start_protocols(ex.clone()).await; + pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> { + self.start_protocols(ex).await; self.read_loop().await } @@ -205,7 +206,7 @@ impl Peer { } /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc<Self>, ex: Executor<'_>) { + async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) { for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { trace!("peer {} start protocol {protocol_id}", self.id); let protocol = constructor(self.clone()); @@ -213,7 +214,6 @@ impl Peer { self.protocol_ids.write().await.push(protocol_id.clone()); let selfc = self.clone(); - let exc = ex.clone(); let proto_idc = protocol_id.clone(); let on_failure = |result: TaskResult<Result<()>>| async move { @@ -227,7 +227,7 @@ impl Peer { }; self.task_group - .spawn(ex.clone(), protocol.start(exc), on_failure); + .spawn(protocol.start(ex.clone()), on_failure); } } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 2433cfc..0d17307 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -13,7 +13,7 @@ use smol::{ use karyons_core::{ async_utils::{TaskGroup, TaskResult}, utils::decode, - Executor, + GlobalExecutor, }; use karyons_net::Conn; @@ -51,10 +51,13 @@ pub struct PeerPool { protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>, /// Managing spawned tasks. - task_group: TaskGroup, + task_group: TaskGroup<'static>, + + /// A global Executor + executor: GlobalExecutor, /// The Configuration for the P2P network. - pub config: Arc<Config>, + pub(crate) config: Arc<Config>, /// Responsible for network and system monitoring. monitor: Arc<Monitor>, @@ -67,6 +70,7 @@ impl PeerPool { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, + executor: GlobalExecutor, ) -> Arc<Self> { let protocols = RwLock::new(HashMap::new()); let protocol_versions = Arc::new(RwLock::new(HashMap::new())); @@ -77,23 +81,23 @@ impl PeerPool { peers: Mutex::new(HashMap::new()), protocols, protocol_versions, - task_group: TaskGroup::new(), + task_group: TaskGroup::new(executor.clone()), + executor, monitor, config, }) } /// Start - pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + pub async fn start(self: &Arc<Self>) -> Result<()> { self.setup_protocols().await?; let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {}); + self.task_group.spawn(selfc.listen_loop(), |_| async {}); Ok(()) } /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc<Self>, ex: Executor<'_>) { + pub async fn listen_loop(self: Arc<Self>) { loop { let new_conn = self.conn_queue.next().await; let disconnect_signal = new_conn.disconnect_signal; @@ -103,7 +107,6 @@ impl PeerPool { new_conn.conn, &new_conn.direction, disconnect_signal.clone(), - ex.clone(), ) .await; @@ -128,7 +131,7 @@ impl PeerPool { let protocols = &mut self.protocols.write().await; protocol_versions.insert(P::id(), P::version()?); - protocols.insert(P::id(), Box::new(c) as Box<ProtocolConstructor>); + protocols.insert(P::id(), c); Ok(()) } @@ -153,7 +156,6 @@ impl PeerPool { conn: Conn, conn_direction: &ConnDirection, disconnect_signal: Sender<()>, - ex: Executor<'_>, ) -> Result<PeerID> { let endpoint = conn.peer_endpoint()?; let io_codec = IOCodec::new(conn); @@ -173,6 +175,7 @@ impl PeerPool { io_codec, endpoint.clone(), conn_direction.clone(), + self.executor.clone(), ); // Insert the new peer @@ -190,7 +193,7 @@ impl PeerPool { }; self.task_group - .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect); + .spawn(peer.run(self.executor.clone()), on_disconnect); info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 515efc6..770b695 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -87,8 +87,11 @@ impl EventValue for ProtocolEvent { /// let peer_id = PeerID::random(); /// let config = Config::default(); /// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// /// // Create a new Backend -/// let backend = Backend::new(peer_id, config); +/// let backend = Backend::new(peer_id, config, ex); /// /// // Attach the NewProtocol /// let c = move |peer| NewProtocol::new(peer); diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index b337494..dc1b9a1 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -39,7 +39,6 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, - task_group: TaskGroup, } impl PingProtocol { @@ -51,7 +50,6 @@ impl PingProtocol { peer, ping_interval, ping_timeout, - task_group: TaskGroup::new(), }) } @@ -130,12 +128,14 @@ impl PingProtocol { impl Protocol for PingProtocol { async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { trace!("Start Ping protocol"); + + let task_group = TaskGroup::new(ex); + let (pong_chan, pong_chan_recv) = channel::bounded(1); let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1); let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), + task_group.spawn( selfc.clone().ping_loop(pong_chan_recv.clone()), |res| async move { if let TaskResult::Completed(result) = res { @@ -148,7 +148,7 @@ impl Protocol for PingProtocol { let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; listener.cancel().await; - self.task_group.cancel().await; + task_group.cancel().await; match result { Either::Left(res) => { diff --git a/p2p/src/utils/version.rs b/p2p/src/utils/version.rs index 4986495..a101b28 100644 --- a/p2p/src/utils/version.rs +++ b/p2p/src/utils/version.rs @@ -5,7 +5,7 @@ use semver::VersionReq; use crate::{Error, Result}; -/// Represents the network version and protocol version used in Karyons p2p. +/// Represents the network version and protocol version used in karyons p2p. /// /// # Example /// |