diff options
Diffstat (limited to 'p2p')
| -rw-r--r-- | p2p/examples/chat.rs | 44 | ||||
| -rw-r--r-- | p2p/examples/monitor.rs | 60 | ||||
| -rw-r--r-- | p2p/examples/peer.rs | 43 | ||||
| -rw-r--r-- | p2p/examples/shared/mod.rs | 33 | ||||
| -rw-r--r-- | p2p/src/backend.rs | 23 | ||||
| -rw-r--r-- | p2p/src/connector.rs | 17 | ||||
| -rw-r--r-- | p2p/src/discovery/lookup.rs | 15 | ||||
| -rw-r--r-- | p2p/src/discovery/mod.rs | 51 | ||||
| -rw-r--r-- | p2p/src/discovery/refresh.rs | 38 | ||||
| -rw-r--r-- | p2p/src/error.rs | 2 | ||||
| -rw-r--r-- | p2p/src/listener.rs | 38 | ||||
| -rw-r--r-- | p2p/src/message.rs | 2 | ||||
| -rw-r--r-- | p2p/src/monitor.rs | 12 | ||||
| -rw-r--r-- | p2p/src/peer/mod.rs | 16 | ||||
| -rw-r--r-- | p2p/src/peer_pool.rs | 27 | ||||
| -rw-r--r-- | p2p/src/protocol.rs | 5 | ||||
| -rw-r--r-- | p2p/src/protocols/ping.rs | 10 | ||||
| -rw-r--r-- | p2p/src/utils/version.rs | 2 | 
18 files changed, 244 insertions, 194 deletions
| diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 4358362..907ba06 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -1,9 +1,11 @@ +mod shared; +  use std::sync::Arc;  use async_std::io;  use async_trait::async_trait;  use clap::Parser; -use smol::{channel, future, Executor}; +use smol::{channel, Executor};  use karyons_net::{Endpoint, Port}; @@ -12,6 +14,8 @@ use karyons_p2p::{      ArcPeer, Backend, Config, P2pError, PeerID, Version,  }; +use shared::run_executor; +  #[derive(Parser)]  #[command(author, version, about, long_about = None)]  struct Cli { @@ -109,33 +113,33 @@ fn main() {          ..Default::default()      }; +    // Create a new Executor +    let ex = Arc::new(Executor::new()); +      // Create a new Backend -    let backend = Backend::new(peer_id, config); +    let backend = Backend::new(peer_id, config, ex.clone());      let (ctrlc_s, ctrlc_r) = channel::unbounded();      let handle = move || ctrlc_s.try_send(()).unwrap();      ctrlc::set_handler(handle).unwrap(); -    // Create a new Executor -    let ex = Arc::new(Executor::new()); - -    let ex_cloned = ex.clone(); -    let task = ex.spawn(async { -        let username = cli.username; - -        // Attach the ChatProtocol -        let c = move |peer| ChatProtocol::new(&username, peer); -        backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); +    run_executor( +        async { +            let username = cli.username; -        // Run the backend -        backend.run(ex_cloned).await.unwrap(); +            // Attach the ChatProtocol +            let c = move |peer| ChatProtocol::new(&username, peer); +            backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); -        // Wait for ctrlc signal -        ctrlc_r.recv().await.unwrap(); +            // Run the backend +            backend.run().await.unwrap(); -        // Shutdown the backend -        backend.shutdown().await; -    }); +            // Wait for ctrlc signal +            ctrlc_r.recv().await.unwrap(); -    future::block_on(ex.run(task)); +            // Shutdown the backend +            backend.shutdown().await; +        }, +        ex, +    );  } diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index cd4defc..fc48c2f 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -1,13 +1,16 @@ +mod shared; +  use std::sync::Arc;  use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor};  use karyons_net::{Endpoint, Port};  use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; +  #[derive(Parser)]  #[command(author, version, about, long_about = None)]  struct Cli { @@ -50,44 +53,39 @@ fn main() {          ..Default::default()      }; +    // Create a new Executor +    let ex = Arc::new(Executor::new()); +      // Create a new Backend -    let backend = Backend::new(peer_id, config); +    let backend = Backend::new(peer_id, config, ex.clone());      let (ctrlc_s, ctrlc_r) = channel::unbounded();      let handle = move || ctrlc_s.try_send(()).unwrap();      ctrlc::set_handler(handle).unwrap(); -    let (signal, shutdown) = channel::unbounded::<()>(); - -    // Create a new Executor -    let ex = Arc::new(Executor::new()); - -    let task = async { -        let monitor = backend.monitor().await; +    let exc = ex.clone(); +    run_executor( +        async { +            let monitor = backend.monitor().await; -        let monitor_task = ex.spawn(async move { -            loop { -                let event = monitor.recv().await.unwrap(); -                println!("{}", event); -            } -        }); +            let monitor_task = exc.spawn(async move { +                loop { +                    let event = monitor.recv().await.unwrap(); +                    println!("{}", event); +                } +            }); -        // Run the backend -        backend.run(ex.clone()).await.unwrap(); +            // Run the backend +            backend.run().await.unwrap(); -        // Wait for ctrlc signal -        ctrlc_r.recv().await.unwrap(); +            // Wait for ctrlc signal +            ctrlc_r.recv().await.unwrap(); -        // Shutdown the backend -        backend.shutdown().await; - -        monitor_task.cancel().await; - -        drop(signal); -    }; +            // Shutdown the backend +            backend.shutdown().await; -    // Run four executor threads. -    Parallel::new() -        .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) -        .finish(|| future::block_on(task)); +            monitor_task.cancel().await; +        }, +        ex, +    );  } diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index f805d68..5ff365d 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -1,13 +1,16 @@ +mod shared; +  use std::sync::Arc;  use clap::Parser; -use easy_parallel::Parallel; -use smol::{channel, future, Executor}; +use smol::{channel, Executor};  use karyons_net::{Endpoint, Port};  use karyons_p2p::{Backend, Config, PeerID}; +use shared::run_executor; +  #[derive(Parser)]  #[command(author, version, about, long_about = None)]  struct Cli { @@ -50,33 +53,27 @@ fn main() {          ..Default::default()      }; +    // Create a new Executor +    let ex = Arc::new(Executor::new()); +      // Create a new Backend -    let backend = Backend::new(peer_id, config); +    let backend = Backend::new(peer_id, config, ex.clone());      let (ctrlc_s, ctrlc_r) = channel::unbounded();      let handle = move || ctrlc_s.try_send(()).unwrap();      ctrlc::set_handler(handle).unwrap(); -    let (signal, shutdown) = channel::unbounded::<()>(); - -    // Create a new Executor -    let ex = Arc::new(Executor::new()); - -    let task = async { -        // Run the backend -        backend.run(ex.clone()).await.unwrap(); +    run_executor( +        async { +            // Run the backend +            backend.run().await.unwrap(); -        // Wait for ctrlc signal -        ctrlc_r.recv().await.unwrap(); - -        // Shutdown the backend -        backend.shutdown().await; - -        drop(signal); -    }; +            // Wait for ctrlc signal +            ctrlc_r.recv().await.unwrap(); -    // Run four executor threads. -    Parallel::new() -        .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) -        .finish(|| future::block_on(task)); +            // Shutdown the backend +            backend.shutdown().await; +        }, +        ex, +    );  } diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs new file mode 100644 index 0000000..9a8e387 --- /dev/null +++ b/p2p/examples/shared/mod.rs @@ -0,0 +1,33 @@ +use std::{num::NonZeroUsize, thread}; + +use easy_parallel::Parallel; +use smol::{channel, future, future::Future}; + +use karyons_core::Executor; + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +fn available_parallelism() -> usize { +    thread::available_parallelism() +        .map(NonZeroUsize::get) +        .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) { +    let (signal, shutdown) = channel::unbounded::<()>(); + +    let num_threads = available_parallelism(); + +    Parallel::new() +        .each(0..(num_threads), |_| { +            future::block_on(ex.run(shutdown.recv())) +        }) +        // Run the main future on the current thread. +        .finish(|| { +            future::block_on(async { +                main_future.await; +                drop(signal); +            }) +        }); +} diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index bb18f06..bb0d891 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,7 +2,7 @@ use std::sync::Arc;  use log::info; -use karyons_core::{pubsub::Subscription, Executor}; +use karyons_core::{pubsub::Subscription, GlobalExecutor};  use crate::{      config::Config, @@ -34,15 +34,16 @@ pub type ArcBackend = Arc<Backend>;  /// // Create the configuration for the backend.  /// let mut config = Config::default();  /// -/// // Create a new Backend -/// let backend = Backend::new(peer_id, config);  ///  /// // Create a new Executor  /// let ex = Arc::new(Executor::new());  /// +/// // Create a new Backend +/// let backend = Backend::new(peer_id, config, ex.clone()); +///  /// let task = async {  ///     // Run the backend -///     backend.run(ex.clone()).await.unwrap(); +///     backend.run().await.unwrap();  ///  ///     // ....  /// @@ -72,14 +73,14 @@ pub struct Backend {  impl Backend {      /// Creates a new Backend. -    pub fn new(id: PeerID, config: Config) -> ArcBackend { +    pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend {          let config = Arc::new(config);          let monitor = Arc::new(Monitor::new()); +        let cq = ConnQueue::new(); -        let conn_queue = ConnQueue::new(); +        let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone()); -        let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone()); -        let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone()); +        let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex);          Arc::new(Self {              id: id.clone(), @@ -91,10 +92,10 @@ impl Backend {      }      /// Run the Backend, starting the PeerPool and Discovery instances. -    pub async fn run(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { +    pub async fn run(self: &Arc<Self>) -> Result<()> {          info!("Run the backend {}", self.id); -        self.peer_pool.start(ex.clone()).await?; -        self.discovery.start(ex.clone()).await?; +        self.peer_pool.start().await?; +        self.discovery.start().await?;          Ok(())      } diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index 3932c41..f41ab57 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -4,7 +4,7 @@ use log::{trace, warn};  use karyons_core::{      async_utils::{Backoff, TaskGroup, TaskResult}, -    Executor, +    GlobalExecutor,  };  use karyons_net::{dial, Conn, Endpoint, NetError}; @@ -17,7 +17,7 @@ use crate::{  /// Responsible for creating outbound connections with other peers.  pub struct Connector {      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>,      /// Manages available outbound slots.      connection_slots: Arc<ConnectionSlots>, @@ -36,9 +36,10 @@ impl Connector {          max_retries: usize,          connection_slots: Arc<ConnectionSlots>,          monitor: Arc<Monitor>, +        ex: GlobalExecutor,      ) -> Arc<Self> {          Arc::new(Self { -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(ex),              monitor,              connection_slots,              max_retries, @@ -92,14 +93,13 @@ impl Connector {      /// Establish a connection to the given `endpoint`. For each new connection,      /// it invokes the provided `callback`, and pass the connection to the callback. -    pub async fn connect_with_cback<'a, Fut>( +    pub async fn connect_with_cback<Fut>(          self: &Arc<Self>, -        ex: Executor<'a>,          endpoint: &Endpoint, -        callback: impl FnOnce(Conn) -> Fut + Send + 'a, +        callback: impl FnOnce(Conn) -> Fut + Send + 'static,      ) -> Result<()>      where -        Fut: Future<Output = Result<()>> + Send + 'a, +        Fut: Future<Output = Result<()>> + Send + 'static,      {          let conn = self.connect(endpoint).await?; @@ -116,8 +116,7 @@ impl Connector {              selfc.connection_slots.remove().await;          }; -        self.task_group -            .spawn(ex.clone(), callback(conn), on_disconnect); +        self.task_group.spawn(callback(conn), on_disconnect);          Ok(())      } diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 94da900..52aa339 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,7 +5,7 @@ use log::{error, trace};  use rand::{rngs::OsRng, seq::SliceRandom, RngCore};  use smol::lock::{Mutex, RwLock}; -use karyons_core::{async_utils::timeout, utils::decode, Executor}; +use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor};  use karyons_net::{Conn, Endpoint}; @@ -59,15 +59,18 @@ impl LookupService {          table: Arc<Mutex<RoutingTable>>,          config: Arc<Config>,          monitor: Arc<Monitor>, +        ex: GlobalExecutor,      ) -> Self {          let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));          let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); -        let listener = Listener::new(inbound_slots.clone(), monitor.clone()); +        let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); +          let connector = Connector::new(              config.lookup_connect_retries,              outbound_slots.clone(),              monitor.clone(), +            ex,          );          let listen_endpoint = config @@ -88,8 +91,8 @@ impl LookupService {      }      /// Start the lookup service. -    pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { -        self.start_listener(ex).await?; +    pub async fn start(self: &Arc<Self>) -> Result<()> { +        self.start_listener().await?;          Ok(())      } @@ -233,7 +236,7 @@ impl LookupService {      }      /// Start a listener. -    async fn start_listener(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { +    async fn start_listener(self: &Arc<Self>) -> Result<()> {          let addr = match &self.listen_endpoint {              Some(a) => a.read().await.addr()?.clone(),              None => return Ok(()), @@ -248,7 +251,7 @@ impl LookupService {              Ok(())          }; -        self.listener.start(ex, endpoint.clone(), callback).await?; +        self.listener.start(endpoint.clone(), callback).await?;          Ok(())      } diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7b8e7dc..7d37eec 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -9,7 +9,7 @@ use smol::lock::Mutex;  use karyons_core::{      async_utils::{Backoff, TaskGroup, TaskResult}, -    Executor, +    GlobalExecutor,  };  use karyons_net::{Conn, Endpoint}; @@ -57,7 +57,7 @@ pub struct Discovery {      pub(crate) outbound_slots: Arc<ConnectionSlots>,      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>,      /// Holds the configuration for the P2P network.      config: Arc<Config>, @@ -70,6 +70,7 @@ impl Discovery {          conn_queue: Arc<ConnQueue>,          config: Arc<Config>,          monitor: Arc<Monitor>, +        ex: GlobalExecutor,      ) -> ArcDiscovery {          let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));          let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -77,16 +78,23 @@ impl Discovery {          let table_key = peer_id.0;          let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); -        let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); -        let lookup_service = -            LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); +        let refresh_service = +            RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); +        let lookup_service = LookupService::new( +            peer_id, +            table.clone(), +            config.clone(), +            monitor.clone(), +            ex.clone(), +        );          let connector = Connector::new(              config.max_connect_retries,              outbound_slots.clone(),              monitor.clone(), +            ex.clone(),          ); -        let listener = Listener::new(inbound_slots.clone(), monitor.clone()); +        let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());          Arc::new(Self {              refresh_service: Arc::new(refresh_service), @@ -97,13 +105,13 @@ impl Discovery {              outbound_slots,              connector,              listener, -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(ex),              config,          })      }      /// Start the Discovery -    pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { +    pub async fn start(self: &Arc<Self>) -> Result<()> {          // Check if the listen_endpoint is provided, and if so, start a listener.          if let Some(endpoint) = &self.config.listen_endpoint {              // Return an error if the discovery port is set to 0. @@ -113,7 +121,7 @@ impl Discovery {                  ));              } -            let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; +            let resolved_endpoint = self.start_listener(endpoint).await?;              if endpoint.addr()? != resolved_endpoint.addr()? {                  info!("Resolved listen endpoint: {resolved_endpoint}"); @@ -127,19 +135,19 @@ impl Discovery {          }          // Start the lookup service -        self.lookup_service.start(ex.clone()).await?; +        self.lookup_service.start().await?;          // Start the refresh service -        self.refresh_service.start(ex.clone()).await?; +        self.refresh_service.start().await?;          // Attempt to manually connect to peer endpoints provided in the Config.          for endpoint in self.config.peer_endpoints.iter() { -            let _ = self.connect(endpoint, None, ex.clone()).await; +            let _ = self.connect(endpoint, None).await;          }          // Start connect loop          let selfc = self.clone();          self.task_group -            .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { +            .spawn(selfc.connect_loop(), |res| async move {                  if let TaskResult::Completed(Err(err)) = res {                      error!("Connect loop stopped: {err}");                  } @@ -159,18 +167,14 @@ impl Discovery {      }      /// Start a listener and on success, return the resolved endpoint. -    async fn start_listener( -        self: &Arc<Self>, -        endpoint: &Endpoint, -        ex: Executor<'_>, -    ) -> Result<Endpoint> { +    async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {          let selfc = self.clone();          let callback = |conn: Conn| async move {              selfc.conn_queue.handle(conn, ConnDirection::Inbound).await;              Ok(())          }; -        let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; +        let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;          Ok(resolved_endpoint)      } @@ -180,7 +184,7 @@ impl Discovery {      ///      /// This will perform a backoff to prevent getting stuck in the loop      /// if the seeding process couldn't find any peers. -    async fn connect_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { +    async fn connect_loop(self: Arc<Self>) -> Result<()> {          let backoff = Backoff::new(500, self.config.seeding_interval * 1000);          loop {              let random_entry = self.random_entry(PENDING_ENTRY).await; @@ -188,8 +192,7 @@ impl Discovery {                  Some(entry) => {                      backoff.reset();                      let endpoint = Endpoint::Tcp(entry.addr, entry.port); -                    self.connect(&endpoint, Some(entry.key.into()), ex.clone()) -                        .await; +                    self.connect(&endpoint, Some(entry.key.into())).await;                  }                  None => {                      backoff.sleep().await; @@ -200,7 +203,7 @@ impl Discovery {      }      /// Connect to the given endpoint using the connector -    async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>, ex: Executor<'_>) { +    async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {          let selfc = self.clone();          let pid_cloned = pid.clone();          let cback = |conn: Conn| async move { @@ -211,7 +214,7 @@ impl Discovery {              Ok(())          }; -        let res = self.connector.connect_with_cback(ex, endpoint, cback).await; +        let res = self.connector.connect_with_cback(endpoint, cback).await;          if let Some(pid) = &pid {              match res { diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 7582c84..a708261 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -12,7 +12,7 @@ use smol::{  use karyons_core::{      async_utils::{timeout, Backoff, TaskGroup, TaskResult},      utils::{decode, encode}, -    Executor, +    GlobalExecutor,  };  use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; @@ -43,7 +43,10 @@ pub struct RefreshService {      listen_endpoint: Option<RwLock<Endpoint>>,      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>, + +    /// A global executor +    executor: GlobalExecutor,      /// Holds the configuration for the P2P network.      config: Arc<Config>, @@ -58,6 +61,7 @@ impl RefreshService {          config: Arc<Config>,          table: Arc<Mutex<RoutingTable>>,          monitor: Arc<Monitor>, +        executor: GlobalExecutor,      ) -> Self {          let listen_endpoint = config              .listen_endpoint @@ -67,41 +71,36 @@ impl RefreshService {          Self {              table,              listen_endpoint, -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(executor.clone()), +            executor,              config,              monitor,          }      }      /// Start the refresh service -    pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { +    pub async fn start(self: &Arc<Self>) -> Result<()> {          if let Some(endpoint) = &self.listen_endpoint {              let endpoint = endpoint.read().await;              let addr = endpoint.addr()?;              let port = self.config.discovery_port;              let selfc = self.clone(); -            self.task_group.spawn( -                ex.clone(), -                selfc.listen_loop(addr.clone(), port), -                |res| async move { +            self.task_group +                .spawn(selfc.listen_loop(addr.clone(), port), |res| async move {                      if let TaskResult::Completed(Err(err)) = res {                          error!("Listen loop stopped: {err}");                      } -                }, -            ); +                });          }          let selfc = self.clone(); -        self.task_group.spawn( -            ex.clone(), -            selfc.refresh_loop(ex.clone()), -            |res| async move { +        self.task_group +            .spawn(selfc.refresh_loop(), |res| async move {                  if let TaskResult::Completed(Err(err)) = res {                      error!("Refresh loop stopped: {err}");                  } -            }, -        ); +            });          Ok(())      } @@ -121,7 +120,7 @@ impl RefreshService {      /// Initiates periodic refreshing of the routing table. This function will      /// select 8 random entries from each bucket in the routing table and start      /// sending Ping messages to the entries. -    async fn refresh_loop(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { +    async fn refresh_loop(self: Arc<Self>) -> Result<()> {          let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));          loop {              timer.next().await; @@ -140,13 +139,14 @@ impl RefreshService {              }              drop(table); -            self.clone().do_refresh(&entries, ex.clone()).await; +            self.clone().do_refresh(&entries).await;          }      }      /// Iterates over the entries and spawns a new task for each entry to      /// initiate a connection attempt to that entry. -    async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry], ex: Executor<'_>) { +    async fn do_refresh(self: Arc<Self>, entries: &[BucketEntry]) { +        let ex = &self.executor;          for chunk in entries.chunks(16) {              let mut tasks = Vec::new();              for bucket_entry in chunk { diff --git a/p2p/src/error.rs b/p2p/src/error.rs index 945e90a..91d2c39 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -2,7 +2,7 @@ use thiserror::Error as ThisError;  pub type Result<T> = std::result::Result<T, Error>; -/// Represents Karyons's p2p Error. +/// Represents karyons's p2p Error.  #[derive(ThisError, Debug)]  pub enum Error {      #[error("IO Error: {0}")] diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index ee92536..f2391f7 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -4,7 +4,7 @@ use log::{error, info, trace};  use karyons_core::{      async_utils::{TaskGroup, TaskResult}, -    Executor, +    GlobalExecutor,  };  use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; @@ -18,7 +18,7 @@ use crate::{  /// Responsible for creating inbound connections with other peers.  pub struct Listener {      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>,      /// Manages available inbound slots.      connection_slots: Arc<ConnectionSlots>, @@ -29,10 +29,14 @@ pub struct Listener {  impl Listener {      /// Creates a new Listener -    pub fn new(connection_slots: Arc<ConnectionSlots>, monitor: Arc<Monitor>) -> Arc<Self> { +    pub fn new( +        connection_slots: Arc<ConnectionSlots>, +        monitor: Arc<Monitor>, +        ex: GlobalExecutor, +    ) -> Arc<Self> {          Arc::new(Self {              connection_slots, -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(ex),              monitor,          })      } @@ -42,15 +46,14 @@ impl Listener {      /// connection to the callback.      ///      /// Returns the resloved listening endpoint. -    pub async fn start<'a, Fut>( +    pub async fn start<Fut>(          self: &Arc<Self>, -        ex: Executor<'a>,          endpoint: Endpoint,          // https://github.com/rust-lang/rfcs/pull/2132 -        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, +        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,      ) -> Result<Endpoint>      where -        Fut: Future<Output = Result<()>> + Send + 'a, +        Fut: Future<Output = Result<()>> + Send + 'static,      {          let listener = match listen(&endpoint).await {              Ok(listener) => { @@ -73,15 +76,12 @@ impl Listener {          info!("Start listening on {endpoint}");          let selfc = self.clone(); -        self.task_group.spawn( -            ex.clone(), -            selfc.listen_loop(ex.clone(), listener, callback), -            |res| async move { +        self.task_group +            .spawn(selfc.listen_loop(listener, callback), |res| async move {                  if let TaskResult::Completed(Err(err)) = res {                      error!("Listen loop stopped: {endpoint} {err}");                  } -            }, -        ); +            });          Ok(resolved_endpoint)      } @@ -90,14 +90,13 @@ impl Listener {          self.task_group.cancel().await;      } -    async fn listen_loop<'a, Fut>( +    async fn listen_loop<Fut>(          self: Arc<Self>, -        ex: Executor<'a>,          listener: Box<dyn NetListener>, -        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, +        callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,      ) -> Result<()>      where -        Fut: Future<Output = Result<()>> + Send + 'a, +        Fut: Future<Output = Result<()>> + Send + 'static,      {          loop {              // Wait for an available inbound slot. @@ -134,8 +133,7 @@ impl Listener {              };              let callback = callback.clone(); -            self.task_group -                .spawn(ex.clone(), callback(conn), on_disconnect); +            self.task_group.spawn(callback(conn), on_disconnect);          }      }  } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index cdb9837..d3691c2 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -12,7 +12,7 @@ pub const MSG_HEADER_SIZE: usize = 6;  /// The maximum allowed size for a message in bytes.  pub const MAX_ALLOWED_MSG_SIZE: u32 = 1000000; -/// Defines the main message in the Karyon P2P network. +/// Defines the main message in the karyon p2p network.  ///  /// This message structure consists of a header and payload, where the header  /// typically contains essential information about the message, and the payload diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index ee0bf44..fbbf43f 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -13,11 +13,19 @@ use karyons_net::Endpoint;  /// # Example  ///  /// ``` +/// use std::sync::Arc; +/// +/// use smol::Executor; +///  /// use karyons_p2p::{Config, Backend, PeerID}; +///  /// async {  ///      -///     let backend = Backend::new(PeerID::random(), Config::default()); -///      +///     // Create a new Executor +///     let ex = Arc::new(Executor::new()); +/// +///     let backend = Backend::new(PeerID::random(), Config::default(), ex); +///  ///     // Create a new Subscription  ///     let sub =  backend.monitor().await;  ///      diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 60e76a1..85cd558 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -14,7 +14,7 @@ use karyons_core::{      async_utils::{select, Either, TaskGroup, TaskResult},      event::{ArcEventSys, EventListener, EventSys},      utils::{decode, encode}, -    Executor, +    GlobalExecutor,  };  use karyons_net::Endpoint; @@ -56,7 +56,7 @@ pub struct Peer {      stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>,  }  impl Peer { @@ -67,6 +67,7 @@ impl Peer {          io_codec: IOCodec,          remote_endpoint: Endpoint,          conn_direction: ConnDirection, +        ex: GlobalExecutor,      ) -> ArcPeer {          Arc::new(Peer {              id: id.clone(), @@ -76,14 +77,14 @@ impl Peer {              remote_endpoint,              conn_direction,              protocol_events: EventSys::new(), -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(ex),              stop_chan: channel::bounded(1),          })      }      /// Run the peer -    pub async fn run(self: Arc<Self>, ex: Executor<'_>) -> Result<()> { -        self.start_protocols(ex.clone()).await; +    pub async fn run(self: Arc<Self>, ex: GlobalExecutor) -> Result<()> { +        self.start_protocols(ex).await;          self.read_loop().await      } @@ -205,7 +206,7 @@ impl Peer {      }      /// Start running the protocols for this peer connection. -    async fn start_protocols(self: &Arc<Self>, ex: Executor<'_>) { +    async fn start_protocols(self: &Arc<Self>, ex: GlobalExecutor) {          for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() {              trace!("peer {} start protocol {protocol_id}", self.id);              let protocol = constructor(self.clone()); @@ -213,7 +214,6 @@ impl Peer {              self.protocol_ids.write().await.push(protocol_id.clone());              let selfc = self.clone(); -            let exc = ex.clone();              let proto_idc = protocol_id.clone();              let on_failure = |result: TaskResult<Result<()>>| async move { @@ -227,7 +227,7 @@ impl Peer {              };              self.task_group -                .spawn(ex.clone(), protocol.start(exc), on_failure); +                .spawn(protocol.start(ex.clone()), on_failure);          }      } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 2433cfc..0d17307 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -13,7 +13,7 @@ use smol::{  use karyons_core::{      async_utils::{TaskGroup, TaskResult},      utils::decode, -    Executor, +    GlobalExecutor,  };  use karyons_net::Conn; @@ -51,10 +51,13 @@ pub struct PeerPool {      protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>,      /// Managing spawned tasks. -    task_group: TaskGroup, +    task_group: TaskGroup<'static>, + +    /// A global Executor +    executor: GlobalExecutor,      /// The Configuration for the P2P network. -    pub config: Arc<Config>, +    pub(crate) config: Arc<Config>,      /// Responsible for network and system monitoring.      monitor: Arc<Monitor>, @@ -67,6 +70,7 @@ impl PeerPool {          conn_queue: Arc<ConnQueue>,          config: Arc<Config>,          monitor: Arc<Monitor>, +        executor: GlobalExecutor,      ) -> Arc<Self> {          let protocols = RwLock::new(HashMap::new());          let protocol_versions = Arc::new(RwLock::new(HashMap::new())); @@ -77,23 +81,23 @@ impl PeerPool {              peers: Mutex::new(HashMap::new()),              protocols,              protocol_versions, -            task_group: TaskGroup::new(), +            task_group: TaskGroup::new(executor.clone()), +            executor,              monitor,              config,          })      }      /// Start -    pub async fn start(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { +    pub async fn start(self: &Arc<Self>) -> Result<()> {          self.setup_protocols().await?;          let selfc = self.clone(); -        self.task_group -            .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {}); +        self.task_group.spawn(selfc.listen_loop(), |_| async {});          Ok(())      }      /// Listens to a new connection from the connection queue -    pub async fn listen_loop(self: Arc<Self>, ex: Executor<'_>) { +    pub async fn listen_loop(self: Arc<Self>) {          loop {              let new_conn = self.conn_queue.next().await;              let disconnect_signal = new_conn.disconnect_signal; @@ -103,7 +107,6 @@ impl PeerPool {                      new_conn.conn,                      &new_conn.direction,                      disconnect_signal.clone(), -                    ex.clone(),                  )                  .await; @@ -128,7 +131,7 @@ impl PeerPool {          let protocols = &mut self.protocols.write().await;          protocol_versions.insert(P::id(), P::version()?); -        protocols.insert(P::id(), Box::new(c) as Box<ProtocolConstructor>); +        protocols.insert(P::id(), c);          Ok(())      } @@ -153,7 +156,6 @@ impl PeerPool {          conn: Conn,          conn_direction: &ConnDirection,          disconnect_signal: Sender<()>, -        ex: Executor<'_>,      ) -> Result<PeerID> {          let endpoint = conn.peer_endpoint()?;          let io_codec = IOCodec::new(conn); @@ -173,6 +175,7 @@ impl PeerPool {              io_codec,              endpoint.clone(),              conn_direction.clone(), +            self.executor.clone(),          );          // Insert the new peer @@ -190,7 +193,7 @@ impl PeerPool {          };          self.task_group -            .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect); +            .spawn(peer.run(self.executor.clone()), on_disconnect);          info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 515efc6..770b695 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -87,8 +87,11 @@ impl EventValue for ProtocolEvent {  ///     let peer_id = PeerID::random();  ///     let config = Config::default();  /// +///     // Create a new Executor +///     let ex = Arc::new(Executor::new()); +///  ///     // Create a new Backend -///     let backend = Backend::new(peer_id, config); +///     let backend = Backend::new(peer_id, config, ex);  ///  ///     // Attach the NewProtocol  ///     let c = move |peer| NewProtocol::new(peer); diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index b337494..dc1b9a1 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -39,7 +39,6 @@ pub struct PingProtocol {      peer: ArcPeer,      ping_interval: u64,      ping_timeout: u64, -    task_group: TaskGroup,  }  impl PingProtocol { @@ -51,7 +50,6 @@ impl PingProtocol {              peer,              ping_interval,              ping_timeout, -            task_group: TaskGroup::new(),          })      } @@ -130,12 +128,14 @@ impl PingProtocol {  impl Protocol for PingProtocol {      async fn start(self: Arc<Self>, ex: Executor<'_>) -> Result<()> {          trace!("Start Ping protocol"); + +        let task_group = TaskGroup::new(ex); +          let (pong_chan, pong_chan_recv) = channel::bounded(1);          let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1);          let selfc = self.clone(); -        self.task_group.spawn( -            ex.clone(), +        task_group.spawn(              selfc.clone().ping_loop(pong_chan_recv.clone()),              |res| async move {                  if let TaskResult::Completed(result) = res { @@ -148,7 +148,7 @@ impl Protocol for PingProtocol {          let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await;          listener.cancel().await; -        self.task_group.cancel().await; +        task_group.cancel().await;          match result {              Either::Left(res) => { diff --git a/p2p/src/utils/version.rs b/p2p/src/utils/version.rs index 4986495..a101b28 100644 --- a/p2p/src/utils/version.rs +++ b/p2p/src/utils/version.rs @@ -5,7 +5,7 @@ use semver::VersionReq;  use crate::{Error, Result}; -/// Represents the network version and protocol version used in Karyons p2p. +/// Represents the network version and protocol version used in karyons p2p.  ///  /// # Example  /// | 
