From 379dca552ca91d22ee007b42f93803ad3dc2b274 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 20 Mar 2024 15:20:21 +0100 Subject: core: add the option to create a new task group without providing an executor & remove GlobalExecutor type --- Cargo.lock | 2 ++ core/Cargo.toml | 2 ++ core/src/async_util/executor.rs | 30 ++++++++++++++++++++++++ core/src/async_util/mod.rs | 2 ++ core/src/async_util/task_group.rs | 48 ++++++++++++++++++++++++++++++++++++--- core/src/lib.rs | 9 -------- jsonrpc/Cargo.toml | 1 + jsonrpc/src/server.rs | 5 +--- p2p/examples/shared/mod.rs | 2 +- p2p/src/backend.rs | 4 ++-- p2p/src/connector.rs | 5 ++-- p2p/src/discovery/lookup.rs | 8 +++++-- p2p/src/discovery/mod.rs | 5 ++-- p2p/src/discovery/refresh.rs | 7 +++--- p2p/src/listener.rs | 5 ++-- p2p/src/peer/mod.rs | 5 ++-- p2p/src/peer_pool.rs | 7 +++--- p2p/src/protocols/ping.rs | 5 ++-- 18 files changed, 108 insertions(+), 44 deletions(-) create mode 100644 core/src/async_util/executor.rs diff --git a/Cargo.lock b/Cargo.lock index 8d42b07..ed7ed4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,6 +1228,8 @@ dependencies = [ name = "karyon_core" version = "0.1.0" dependencies = [ + "async-lock 3.3.0", + "async-process", "async-task", "bincode", "chrono", diff --git a/core/Cargo.toml b/core/Cargo.toml index 7e7e511..c8e2b8d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,6 +16,8 @@ rand = "0.8.5" thiserror = "1.0.58" dirs = "5.0.1" async-task = "4.7.0" +async-lock = "3.3.0" +async-process = "2.1.0" ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true} diff --git a/core/src/async_util/executor.rs b/core/src/async_util/executor.rs new file mode 100644 index 0000000..3e7aa06 --- /dev/null +++ b/core/src/async_util/executor.rs @@ -0,0 +1,30 @@ +use std::{panic::catch_unwind, sync::Arc, thread}; + +use async_lock::OnceCell; +use smol::Executor as SmolEx; + +static GLOBAL_EXECUTOR: OnceCell>> = OnceCell::new(); + +/// A pointer to an Executor +pub type Executor<'a> = Arc>; + +/// Returns a single-threaded global executor +pub(crate) fn global_executor() -> Executor<'static> { + fn init_executor() -> Executor<'static> { + let ex = smol::Executor::new(); + thread::Builder::new() + .spawn(|| loop { + catch_unwind(|| { + smol::block_on(global_executor().run(smol::future::pending::<()>())) + }) + .ok(); + }) + .expect("cannot spawn executor thread"); + // Prevent spawning another thread by running the process driver on this + // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs + ex.spawn(async_process::driver()).detach(); + Arc::new(ex) + } + + GLOBAL_EXECUTOR.get_or_init_blocking(init_executor).clone() +} diff --git a/core/src/async_util/mod.rs b/core/src/async_util/mod.rs index c871bad..2916118 100644 --- a/core/src/async_util/mod.rs +++ b/core/src/async_util/mod.rs @@ -1,6 +1,7 @@ mod backoff; mod condvar; mod condwait; +mod executor; mod select; mod task_group; mod timeout; @@ -8,6 +9,7 @@ mod timeout; pub use backoff::Backoff; pub use condvar::CondVar; pub use condwait::CondWait; +pub use executor::Executor; pub use select::{select, Either}; pub use task_group::{TaskGroup, TaskResult}; pub use timeout::timeout; diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs index 7129af2..0fb4855 100644 --- a/core/src/async_util/task_group.rs +++ b/core/src/async_util/task_group.rs @@ -2,9 +2,7 @@ use std::{future::Future, sync::Arc, sync::Mutex}; use async_task::FallibleTask; -use crate::Executor; - -use super::{select, CondWait, Either}; +use super::{executor::global_executor, select, CondWait, Either, Executor}; /// TaskGroup is a group of spawned tasks. /// @@ -35,6 +33,19 @@ pub struct TaskGroup<'a> { executor: Executor<'a>, } +impl TaskGroup<'static> { + /// Creates a new task group without providing an executor + /// + /// This will Spawn a task onto a global executor (single-threaded by default). + pub fn new_without_executor() -> Self { + Self { + tasks: Mutex::new(Vec::new()), + stop_signal: Arc::new(CondWait::new()), + executor: global_executor(), + } + } +} + impl<'a> TaskGroup<'a> { /// Creates a new task group pub fn new(executor: Executor<'a>) -> Self { @@ -191,4 +202,35 @@ mod tests { group.cancel().await; })); } + + #[test] + fn test_task_group_without_executor() { + smol::block_on(async { + let group = Arc::new(TaskGroup::new_without_executor()); + + group.spawn(future::ready(0), |res| async move { + assert!(matches!(res, TaskResult::Completed(0))); + }); + + group.spawn(future::pending::<()>(), |res| async move { + assert!(matches!(res, TaskResult::Cancelled)); + }); + + let groupc = group.clone(); + group.spawn( + async move { + groupc.spawn(future::pending::<()>(), |res| async move { + assert!(matches!(res, TaskResult::Cancelled)); + }); + }, + |res| async move { + assert!(matches!(res, TaskResult::Completed(_))); + }, + ); + + // Do something + smol::Timer::after(std::time::Duration::from_millis(50)).await; + group.cancel().await; + }); + } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 442b642..ae88188 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -18,13 +18,4 @@ pub mod pubsub; /// Collects common cryptographic tools pub mod crypto; -use smol::Executor as SmolEx; -use std::sync::Arc; - -/// A pointer to an Executor -pub type Executor<'a> = Arc>; - -/// A Global Executor -pub type GlobalExecutor = Arc>; - use error::Result; diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 5edc317..73ae275 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -2,6 +2,7 @@ name = "karyon_jsonrpc" version.workspace = true edition.workspace = true +autoexamples = false [dependencies] karyon_core.workspace = true diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index b090d5c..ac1673d 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -3,10 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, warn}; use smol::lock::RwLock; -use karyon_core::{ - async_util::{TaskGroup, TaskResult}, - Executor, -}; +use karyon_core::async_util::{Executor, TaskGroup, TaskResult}; use karyon_net::{Conn, Listener, ToListener}; diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs index aad40d7..8065e63 100644 --- a/p2p/examples/shared/mod.rs +++ b/p2p/examples/shared/mod.rs @@ -3,7 +3,7 @@ use std::{num::NonZeroUsize, thread}; use easy_parallel::Parallel; use smol::{channel, future, future::Future}; -use karyon_core::Executor; +use karyon_core::async_util::Executor; /// Returns an estimate of the default amount of parallelism a program should use. /// see `std::thread::available_parallelism` diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 703e7de..d33f3dc 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use log::info; -use karyon_core::{crypto::KeyPair, pubsub::Subscription, GlobalExecutor}; +use karyon_core::{async_util::Executor, crypto::KeyPair, pubsub::Subscription}; use crate::{ config::Config, @@ -37,7 +37,7 @@ pub struct Backend { impl Backend { /// Creates a new Backend. - pub fn new(key_pair: &KeyPair, config: Config, ex: GlobalExecutor) -> ArcBackend { + pub fn new(key_pair: &KeyPair, config: Config, ex: Executor<'static>) -> ArcBackend { let config = Arc::new(config); let monitor = Arc::new(Monitor::new()); let conn_queue = ConnQueue::new(); diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index 41839ab..9bf63f9 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc}; use log::{error, trace, warn}; use karyon_core::{ - async_util::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, Executor, TaskGroup, TaskResult}, crypto::KeyPair, - GlobalExecutor, }; use karyon_net::{tcp, tls, Conn, Endpoint, NetError}; @@ -48,7 +47,7 @@ impl Connector { connection_slots: Arc, enable_tls: bool, monitor: Arc, - ex: GlobalExecutor, + ex: Executor<'static>, ) -> Arc { Arc::new(Self { key_pair: key_pair.clone(), diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 1d73306..c81fbc6 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,7 +5,11 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use smol::lock::{Mutex, RwLock}; -use karyon_core::{async_util::timeout, crypto::KeyPair, util::decode, GlobalExecutor}; +use karyon_core::{ + async_util::{timeout, Executor}, + crypto::KeyPair, + util::decode, +}; use karyon_net::{Conn, Endpoint}; @@ -60,7 +64,7 @@ impl LookupService { table: Arc>, config: Arc, monitor: Arc, - ex: GlobalExecutor, + ex: Executor<'static>, ) -> Self { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 040a415..4b54233 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -8,9 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom}; use smol::lock::Mutex; use karyon_core::{ - async_util::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, Executor, TaskGroup, TaskResult}, crypto::KeyPair, - GlobalExecutor, }; use karyon_net::{Conn, Endpoint}; @@ -72,7 +71,7 @@ impl Discovery { conn_queue: Arc, config: Arc, monitor: Arc, - ex: GlobalExecutor, + ex: Executor<'static>, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index bfcab56..e56f0eb 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -10,9 +10,8 @@ use smol::{ }; use karyon_core::{ - async_util::{timeout, Backoff, TaskGroup, TaskResult}, + async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult}, util::{decode, encode}, - GlobalExecutor, }; use karyon_net::{udp, Connection, Endpoint, NetError}; @@ -46,7 +45,7 @@ pub struct RefreshService { task_group: TaskGroup<'static>, /// A global executor - executor: GlobalExecutor, + executor: Executor<'static>, /// Holds the configuration for the P2P network. config: Arc, @@ -61,7 +60,7 @@ impl RefreshService { config: Arc, table: Arc>, monitor: Arc, - executor: GlobalExecutor, + executor: Executor<'static>, ) -> Self { let listen_endpoint = config .listen_endpoint diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 17aa187..c9b7390 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc}; use log::{debug, error, info}; use karyon_core::{ - async_util::{TaskGroup, TaskResult}, + async_util::{Executor, TaskGroup, TaskResult}, crypto::KeyPair, - GlobalExecutor, }; use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint}; @@ -42,7 +41,7 @@ impl Listener { connection_slots: Arc, enable_tls: bool, monitor: Arc, - ex: GlobalExecutor, + ex: Executor<'static>, ) -> Arc { Arc::new(Self { key_pair: key_pair.clone(), diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 1e98f1b..1fc5ccf 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -11,10 +11,9 @@ use smol::{ }; use karyon_core::{ - async_util::{select, Either, TaskGroup, TaskResult}, + async_util::{select, Either, Executor, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, util::{decode, encode}, - GlobalExecutor, }; use karyon_net::Endpoint; @@ -67,7 +66,7 @@ impl Peer { codec: Codec, remote_endpoint: Endpoint, conn_direction: ConnDirection, - ex: GlobalExecutor, + ex: Executor<'static>, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index ead6d8f..48499fe 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -11,9 +11,8 @@ use smol::{ }; use karyon_core::{ - async_util::{TaskGroup, TaskResult}, + async_util::{Executor, TaskGroup, TaskResult}, util::decode, - GlobalExecutor, }; use karyon_net::Conn; @@ -54,7 +53,7 @@ pub struct PeerPool { task_group: TaskGroup<'static>, /// A global Executor - executor: GlobalExecutor, + executor: Executor<'static>, /// The Configuration for the P2P network. pub(crate) config: Arc, @@ -70,7 +69,7 @@ impl PeerPool { conn_queue: Arc, config: Arc, monitor: Arc, - executor: GlobalExecutor, + executor: Executor<'static>, ) -> Arc { let protocols = RwLock::new(HashMap::new()); let protocol_versions = Arc::new(RwLock::new(HashMap::new())); diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index 22c1b3d..4885c1e 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -12,10 +12,9 @@ use smol::{ }; use karyon_core::{ - async_util::{select, timeout, Either, TaskGroup, TaskResult}, + async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult}, event::EventListener, util::decode, - GlobalExecutor, }; use karyon_net::NetError; @@ -44,7 +43,7 @@ pub struct PingProtocol { impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { -- cgit v1.2.3