aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/listener.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
committerhozan23 <hozan23@proton.me>2023-11-15 17:16:39 +0300
commit78884caca030104557ca277dd3a41cefb70f5be8 (patch)
treec33650dfe44a219e395dff1966d298b58b09acb3 /p2p/src/listener.rs
parentf0729022589ee8e48b5558ab30462f95d06fe6df (diff)
improve the TaskGroup API
the TaskGroup now holds an Executor instead of passing it when calling its spawn method also, define a global executor `Executor<'static>` and use static lifetime instead of a lifetime placeholder This improvement simplify the code for spawning a new task. There is no need to pass the executor around.
Diffstat (limited to 'p2p/src/listener.rs')
-rw-r--r--p2p/src/listener.rs38
1 files changed, 18 insertions, 20 deletions
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index ee92536..f2391f7 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -4,7 +4,7 @@ use log::{error, info, trace};
use karyons_core::{
async_utils::{TaskGroup, TaskResult},
- Executor,
+ GlobalExecutor,
};
use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
@@ -18,7 +18,7 @@ use crate::{
/// Responsible for creating inbound connections with other peers.
pub struct Listener {
/// Managing spawned tasks.
- task_group: TaskGroup,
+ task_group: TaskGroup<'static>,
/// Manages available inbound slots.
connection_slots: Arc<ConnectionSlots>,
@@ -29,10 +29,14 @@ pub struct Listener {
impl Listener {
/// Creates a new Listener
- pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> {
+ pub fn new(
+ connection_slots: Arc<ConnectionSlots>,
+ monitor: Arc<Monitor>,
+ ex: GlobalExecutor,
+ ) -> Arc<Self> {
Arc::new(Self {
connection_slots,
- task_group: TaskGroup::new(),
+ task_group: TaskGroup::new(ex),
monitor,
})
}
@@ -42,15 +46,14 @@ impl Listener {
/// connection to the callback.
///
/// Returns the resloved listening endpoint.
- pub async fn start<'a, Fut>(
+ pub async fn start<Fut>(
self: &Arc<Self>,
- ex: Executor<'a>,
endpoint: Endpoint,
// https://github.com/rust-lang/rfcs/pull/2132
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
) -> Result<Endpoint>
where
- Fut: Future<Output = Result<()>> + Send + 'a,
+ Fut: Future<Output = Result<()>> + Send + 'static,
{
let listener = match listen(&endpoint).await {
Ok(listener) => {
@@ -73,15 +76,12 @@ impl Listener {
info!("Start listening on {endpoint}");
let selfc = self.clone();
- self.task_group.spawn(
- ex.clone(),
- selfc.listen_loop(ex.clone(), listener, callback),
- |res| async move {
+ self.task_group
+ .spawn(selfc.listen_loop(listener, callback), |res| async move {
if let TaskResult::Completed(Err(err)) = res {
error!("Listen loop stopped: {endpoint} {err}");
}
- },
- );
+ });
Ok(resolved_endpoint)
}
@@ -90,14 +90,13 @@ impl Listener {
self.task_group.cancel().await;
}
- async fn listen_loop<'a, Fut>(
+ async fn listen_loop<Fut>(
self: Arc<Self>,
- ex: Executor<'a>,
listener: Box<dyn NetListener>,
- callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a,
+ callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
) -> Result<()>
where
- Fut: Future<Output = Result<()>> + Send + 'a,
+ Fut: Future<Output = Result<()>> + Send + 'static,
{
loop {
// Wait for an available inbound slot.
@@ -134,8 +133,7 @@ impl Listener {
};
let callback = callback.clone();
- self.task_group
- .spawn(ex.clone(), callback(conn), on_disconnect);
+ self.task_group.spawn(callback(conn), on_disconnect);
}
}
}