aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--core/Cargo.toml2
-rw-r--r--core/src/async_util/executor.rs30
-rw-r--r--core/src/async_util/mod.rs2
-rw-r--r--core/src/async_util/task_group.rs48
-rw-r--r--core/src/lib.rs9
-rw-r--r--jsonrpc/Cargo.toml1
-rw-r--r--jsonrpc/src/server.rs5
-rw-r--r--p2p/examples/shared/mod.rs2
-rw-r--r--p2p/src/backend.rs4
-rw-r--r--p2p/src/connector.rs5
-rw-r--r--p2p/src/discovery/lookup.rs8
-rw-r--r--p2p/src/discovery/mod.rs5
-rw-r--r--p2p/src/discovery/refresh.rs7
-rw-r--r--p2p/src/listener.rs5
-rw-r--r--p2p/src/peer/mod.rs5
-rw-r--r--p2p/src/peer_pool.rs7
-rw-r--r--p2p/src/protocols/ping.rs5
18 files changed, 108 insertions, 44 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8d42b07..ed7ed4c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1228,6 +1228,8 @@ dependencies = [
name = "karyon_core"
version = "0.1.0"
dependencies = [
+ "async-lock 3.3.0",
+ "async-process",
"async-task",
"bincode",
"chrono",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 7e7e511..c8e2b8d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -16,6 +16,8 @@ rand = "0.8.5"
thiserror = "1.0.58"
dirs = "5.0.1"
async-task = "4.7.0"
+async-lock = "3.3.0"
+async-process = "2.1.0"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true}
diff --git a/core/src/async_util/executor.rs b/core/src/async_util/executor.rs
new file mode 100644
index 0000000..3e7aa06
--- /dev/null
+++ b/core/src/async_util/executor.rs
@@ -0,0 +1,30 @@
+use std::{panic::catch_unwind, sync::Arc, thread};
+
+use async_lock::OnceCell;
+use smol::Executor as SmolEx;
+
+static GLOBAL_EXECUTOR: OnceCell<Arc<smol::Executor<'_>>> = OnceCell::new();
+
+/// A pointer to an Executor
+pub type Executor<'a> = Arc<SmolEx<'a>>;
+
+/// Returns a single-threaded global executor
+pub(crate) fn global_executor() -> Executor<'static> {
+ fn init_executor() -> Executor<'static> {
+ let ex = smol::Executor::new();
+ thread::Builder::new()
+ .spawn(|| loop {
+ catch_unwind(|| {
+ smol::block_on(global_executor().run(smol::future::pending::<()>()))
+ })
+ .ok();
+ })
+ .expect("cannot spawn executor thread");
+ // Prevent spawning another thread by running the process driver on this
+ // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs
+ ex.spawn(async_process::driver()).detach();
+ Arc::new(ex)
+ }
+
+ GLOBAL_EXECUTOR.get_or_init_blocking(init_executor).clone()
+}
diff --git a/core/src/async_util/mod.rs b/core/src/async_util/mod.rs
index c871bad..2916118 100644
--- a/core/src/async_util/mod.rs
+++ b/core/src/async_util/mod.rs
@@ -1,6 +1,7 @@
mod backoff;
mod condvar;
mod condwait;
+mod executor;
mod select;
mod task_group;
mod timeout;
@@ -8,6 +9,7 @@ mod timeout;
pub use backoff::Backoff;
pub use condvar::CondVar;
pub use condwait::CondWait;
+pub use executor::Executor;
pub use select::{select, Either};
pub use task_group::{TaskGroup, TaskResult};
pub use timeout::timeout;
diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs
index 7129af2..0fb4855 100644
--- a/core/src/async_util/task_group.rs
+++ b/core/src/async_util/task_group.rs
@@ -2,9 +2,7 @@ use std::{future::Future, sync::Arc, sync::Mutex};
use async_task::FallibleTask;
-use crate::Executor;
-
-use super::{select, CondWait, Either};
+use super::{executor::global_executor, select, CondWait, Either, Executor};
/// TaskGroup is a group of spawned tasks.
///
@@ -35,6 +33,19 @@ pub struct TaskGroup<'a> {
executor: Executor<'a>,
}
+impl TaskGroup<'static> {
+ /// Creates a new task group without providing an executor
+ ///
+ /// This will Spawn a task onto a global executor (single-threaded by default).
+ pub fn new_without_executor() -> Self {
+ Self {
+ tasks: Mutex::new(Vec::new()),
+ stop_signal: Arc::new(CondWait::new()),
+ executor: global_executor(),
+ }
+ }
+}
+
impl<'a> TaskGroup<'a> {
/// Creates a new task group
pub fn new(executor: Executor<'a>) -> Self {
@@ -191,4 +202,35 @@ mod tests {
group.cancel().await;
}));
}
+
+ #[test]
+ fn test_task_group_without_executor() {
+ smol::block_on(async {
+ let group = Arc::new(TaskGroup::new_without_executor());
+
+ group.spawn(future::ready(0), |res| async move {
+ assert!(matches!(res, TaskResult::Completed(0)));
+ });
+
+ group.spawn(future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+
+ let groupc = group.clone();
+ group.spawn(
+ async move {
+ groupc.spawn(future::pending::<()>(), |res| async move {
+ assert!(matches!(res, TaskResult::Cancelled));
+ });
+ },
+ |res| async move {
+ assert!(matches!(res, TaskResult::Completed(_)));
+ },
+ );
+
+ // Do something
+ smol::Timer::after(std::time::Duration::from_millis(50)).await;
+ group.cancel().await;
+ });
+ }
}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 442b642..ae88188 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -18,13 +18,4 @@ pub mod pubsub;
/// Collects common cryptographic tools
pub mod crypto;
-use smol::Executor as SmolEx;
-use std::sync::Arc;
-
-/// A pointer to an Executor
-pub type Executor<'a> = Arc<SmolEx<'a>>;
-
-/// A Global Executor
-pub type GlobalExecutor = Arc<SmolEx<'static>>;
-
use error::Result;
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml
index 5edc317..73ae275 100644
--- a/jsonrpc/Cargo.toml
+++ b/jsonrpc/Cargo.toml
@@ -2,6 +2,7 @@
name = "karyon_jsonrpc"
version.workspace = true
edition.workspace = true
+autoexamples = false
[dependencies]
karyon_core.workspace = true
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs
index b090d5c..ac1673d 100644
--- a/jsonrpc/src/server.rs
+++ b/jsonrpc/src/server.rs
@@ -3,10 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use log::{debug, error, warn};
use smol::lock::RwLock;
-use karyon_core::{
- async_util::{TaskGroup, TaskResult},
- Executor,
-};
+use karyon_core::async_util::{Executor, TaskGroup, TaskResult};
use karyon_net::{Conn, Listener, ToListener};
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
index aad40d7..8065e63 100644
--- a/p2p/examples/shared/mod.rs
+++ b/p2p/examples/shared/mod.rs
@@ -3,7 +3,7 @@ use std::{num::NonZeroUsize, thread};
use easy_parallel::Parallel;
use smol::{channel, future, future::Future};
-use karyon_core::Executor;
+use karyon_core::async_util::Executor;
/// Returns an estimate of the default amount of parallelism a program should use.
/// see `std::thread::available_parallelism`
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 703e7de..d33f3dc 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use log::info;
-use karyon_core::{crypto::KeyPair, pubsub::Subscription, GlobalExecutor};
+use karyon_core::{async_util::Executor, crypto::KeyPair, pubsub::Subscription};
use crate::{
config::Config,
@@ -37,7 +37,7 @@ pub struct Backend {
impl Backend {
/// Creates a new Backend.
- pub fn new(key_pair: &KeyPair, config: Config, ex: GlobalExecutor) -> ArcBackend {
+ pub fn new(key_pair: &KeyPair, config: Config, ex: Executor<'static>) -> ArcBackend {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new());
let conn_queue = ConnQueue::new();
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index 41839ab..9bf63f9 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc};
use log::{error, trace, warn};
use karyon_core::{
- async_util::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{tcp, tls, Conn, Endpoint, NetError};
@@ -48,7 +47,7 @@ impl Connector {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 1d73306..c81fbc6 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,7 +5,11 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyon_core::{async_util::timeout, crypto::KeyPair, util::decode, GlobalExecutor};
+use karyon_core::{
+ async_util::{timeout, Executor},
+ crypto::KeyPair,
+ util::decode,
+};
use karyon_net::{Conn, Endpoint};
@@ -60,7 +64,7 @@ impl LookupService {
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Self {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 040a415..4b54233 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -8,9 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use smol::lock::Mutex;
use karyon_core::{
- async_util::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{Conn, Endpoint};
@@ -72,7 +71,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index bfcab56..e56f0eb 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -10,9 +10,8 @@ use smol::{
};
use karyon_core::{
- async_util::{timeout, Backoff, TaskGroup, TaskResult},
+ async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult},
util::{decode, encode},
- GlobalExecutor,
};
use karyon_net::{udp, Connection, Endpoint, NetError};
@@ -46,7 +45,7 @@ pub struct RefreshService {
task_group: TaskGroup<'static>,
/// A global executor
- executor: GlobalExecutor,
+ executor: Executor<'static>,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -61,7 +60,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
- executor: GlobalExecutor,
+ executor: Executor<'static>,
) -> Self {
let listen_endpoint = config
.listen_endpoint
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 17aa187..c9b7390 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc};
use log::{debug, error, info};
use karyon_core::{
- async_util::{TaskGroup, TaskResult},
+ async_util::{Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint};
@@ -42,7 +41,7 @@ impl Listener {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 1e98f1b..1fc5ccf 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -11,10 +11,9 @@ use smol::{
};
use karyon_core::{
- async_util::{select, Either, TaskGroup, TaskResult},
+ async_util::{select, Either, Executor, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
util::{decode, encode},
- GlobalExecutor,
};
use karyon_net::Endpoint;
@@ -67,7 +66,7 @@ impl Peer {
codec: Codec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> ArcPeer {
Arc::new(Peer {
id: id.clone(),
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index ead6d8f..48499fe 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -11,9 +11,8 @@ use smol::{
};
use karyon_core::{
- async_util::{TaskGroup, TaskResult},
+ async_util::{Executor, TaskGroup, TaskResult},
util::decode,
- GlobalExecutor,
};
use karyon_net::Conn;
@@ -54,7 +53,7 @@ pub struct PeerPool {
task_group: TaskGroup<'static>,
/// A global Executor
- executor: GlobalExecutor,
+ executor: Executor<'static>,
/// The Configuration for the P2P network.
pub(crate) config: Arc<Config>,
@@ -70,7 +69,7 @@ impl PeerPool {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- executor: GlobalExecutor,
+ executor: Executor<'static>,
) -> Arc<Self> {
let protocols = RwLock::new(HashMap::new());
let protocol_versions = Arc::new(RwLock::new(HashMap::new()));
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index 22c1b3d..4885c1e 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -12,10 +12,9 @@ use smol::{
};
use karyon_core::{
- async_util::{select, timeout, Either, TaskGroup, TaskResult},
+ async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult},
event::EventListener,
util::decode,
- GlobalExecutor,
};
use karyon_net::NetError;
@@ -44,7 +43,7 @@ pub struct PingProtocol {
impl PingProtocol {
#[allow(clippy::new_ret_no_self)]
- pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol {
+ pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol {
let ping_interval = peer.config().ping_interval;
let ping_timeout = peer.config().ping_timeout;
Arc::new(Self {