aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2024-03-20 15:20:21 +0100
committerhozan23 <hozan23@proton.me>2024-03-20 15:20:21 +0100
commit379dca552ca91d22ee007b42f93803ad3dc2b274 (patch)
tree1ba10573b6ec5213baf46f6ab9c3767e0daa4eed /p2p
parent340957fec147f4429796413f27bbd9b84ba6f141 (diff)
core: add the option to create a new task group without providing an
executor & remove GlobalExecutor type
Diffstat (limited to 'p2p')
-rw-r--r--p2p/examples/shared/mod.rs2
-rw-r--r--p2p/src/backend.rs4
-rw-r--r--p2p/src/connector.rs5
-rw-r--r--p2p/src/discovery/lookup.rs8
-rw-r--r--p2p/src/discovery/mod.rs5
-rw-r--r--p2p/src/discovery/refresh.rs7
-rw-r--r--p2p/src/listener.rs5
-rw-r--r--p2p/src/peer/mod.rs5
-rw-r--r--p2p/src/peer_pool.rs7
-rw-r--r--p2p/src/protocols/ping.rs5
10 files changed, 25 insertions, 28 deletions
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
index aad40d7..8065e63 100644
--- a/p2p/examples/shared/mod.rs
+++ b/p2p/examples/shared/mod.rs
@@ -3,7 +3,7 @@ use std::{num::NonZeroUsize, thread};
use easy_parallel::Parallel;
use smol::{channel, future, future::Future};
-use karyon_core::Executor;
+use karyon_core::async_util::Executor;
/// Returns an estimate of the default amount of parallelism a program should use.
/// see `std::thread::available_parallelism`
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 703e7de..d33f3dc 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use log::info;
-use karyon_core::{crypto::KeyPair, pubsub::Subscription, GlobalExecutor};
+use karyon_core::{async_util::Executor, crypto::KeyPair, pubsub::Subscription};
use crate::{
config::Config,
@@ -37,7 +37,7 @@ pub struct Backend {
impl Backend {
/// Creates a new Backend.
- pub fn new(key_pair: &KeyPair, config: Config, ex: GlobalExecutor) -> ArcBackend {
+ pub fn new(key_pair: &KeyPair, config: Config, ex: Executor<'static>) -> ArcBackend {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new());
let conn_queue = ConnQueue::new();
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index 41839ab..9bf63f9 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc};
use log::{error, trace, warn};
use karyon_core::{
- async_util::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{tcp, tls, Conn, Endpoint, NetError};
@@ -48,7 +47,7 @@ impl Connector {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 1d73306..c81fbc6 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,7 +5,11 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyon_core::{async_util::timeout, crypto::KeyPair, util::decode, GlobalExecutor};
+use karyon_core::{
+ async_util::{timeout, Executor},
+ crypto::KeyPair,
+ util::decode,
+};
use karyon_net::{Conn, Endpoint};
@@ -60,7 +64,7 @@ impl LookupService {
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Self {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 040a415..4b54233 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -8,9 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use smol::lock::Mutex;
use karyon_core::{
- async_util::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{Conn, Endpoint};
@@ -72,7 +71,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index bfcab56..e56f0eb 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -10,9 +10,8 @@ use smol::{
};
use karyon_core::{
- async_util::{timeout, Backoff, TaskGroup, TaskResult},
+ async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult},
util::{decode, encode},
- GlobalExecutor,
};
use karyon_net::{udp, Connection, Endpoint, NetError};
@@ -46,7 +45,7 @@ pub struct RefreshService {
task_group: TaskGroup<'static>,
/// A global executor
- executor: GlobalExecutor,
+ executor: Executor<'static>,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -61,7 +60,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
- executor: GlobalExecutor,
+ executor: Executor<'static>,
) -> Self {
let listen_endpoint = config
.listen_endpoint
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 17aa187..c9b7390 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -3,9 +3,8 @@ use std::{future::Future, sync::Arc};
use log::{debug, error, info};
use karyon_core::{
- async_util::{TaskGroup, TaskResult},
+ async_util::{Executor, TaskGroup, TaskResult},
crypto::KeyPair,
- GlobalExecutor,
};
use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint};
@@ -42,7 +41,7 @@ impl Listener {
connection_slots: Arc<ConnectionSlots>,
enable_tls: bool,
monitor: Arc<Monitor>,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> Arc<Self> {
Arc::new(Self {
key_pair: key_pair.clone(),
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 1e98f1b..1fc5ccf 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -11,10 +11,9 @@ use smol::{
};
use karyon_core::{
- async_util::{select, Either, TaskGroup, TaskResult},
+ async_util::{select, Either, Executor, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
util::{decode, encode},
- GlobalExecutor,
};
use karyon_net::Endpoint;
@@ -67,7 +66,7 @@ impl Peer {
codec: Codec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
- ex: GlobalExecutor,
+ ex: Executor<'static>,
) -> ArcPeer {
Arc::new(Peer {
id: id.clone(),
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index ead6d8f..48499fe 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -11,9 +11,8 @@ use smol::{
};
use karyon_core::{
- async_util::{TaskGroup, TaskResult},
+ async_util::{Executor, TaskGroup, TaskResult},
util::decode,
- GlobalExecutor,
};
use karyon_net::Conn;
@@ -54,7 +53,7 @@ pub struct PeerPool {
task_group: TaskGroup<'static>,
/// A global Executor
- executor: GlobalExecutor,
+ executor: Executor<'static>,
/// The Configuration for the P2P network.
pub(crate) config: Arc<Config>,
@@ -70,7 +69,7 @@ impl PeerPool {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- executor: GlobalExecutor,
+ executor: Executor<'static>,
) -> Arc<Self> {
let protocols = RwLock::new(HashMap::new());
let protocol_versions = Arc::new(RwLock::new(HashMap::new()));
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index 22c1b3d..4885c1e 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -12,10 +12,9 @@ use smol::{
};
use karyon_core::{
- async_util::{select, timeout, Either, TaskGroup, TaskResult},
+ async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult},
event::EventListener,
util::decode,
- GlobalExecutor,
};
use karyon_net::NetError;
@@ -44,7 +43,7 @@ pub struct PingProtocol {
impl PingProtocol {
#[allow(clippy::new_ret_no_self)]
- pub fn new(peer: ArcPeer, executor: GlobalExecutor) -> ArcProtocol {
+ pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol {
let ping_interval = peer.config().ping_interval;
let ping_timeout = peer.config().ping_timeout;
Arc::new(Self {