diff options
author | hozan23 <hozan23@karyontech.net> | 2024-04-11 10:19:20 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-05-19 13:51:30 +0200 |
commit | 0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch) | |
tree | 961d73218af672797d49f899289bef295bc56493 /p2p/src/discovery/mod.rs | |
parent | a69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff) |
add support for tokio & improve net crate api
Diffstat (limited to 'p2p/src/discovery/mod.rs')
-rw-r--r-- | p2p/src/discovery/mod.rs | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 3e437aa..19ae77a 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -5,10 +5,10 @@ use std::sync::Arc; use log::{error, info}; use rand::{rngs::OsRng, seq::SliceRandom}; -use smol::lock::Mutex; use karyon_core::{ - async_util::{Backoff, Executor, TaskGroup, TaskResult}, + async_runtime::{lock::Mutex, Executor}, + async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; @@ -19,6 +19,7 @@ use crate::{ connection::{ConnDirection, ConnQueue}, connector::Connector, listener::Listener, + message::NetMsg, monitor::Monitor, routing_table::{ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, @@ -45,6 +46,7 @@ pub struct Discovery { /// Connector connector: Arc<Connector>, + /// Listener listener: Arc<Listener>, @@ -53,11 +55,12 @@ pub struct Discovery { /// Inbound slots. pub(crate) inbound_slots: Arc<ConnectionSlots>, + /// Outbound slots. pub(crate) outbound_slots: Arc<ConnectionSlots>, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -71,7 +74,7 @@ impl Discovery { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -180,7 +183,7 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); - let callback = |c: Conn| async move { + let callback = |c: Conn<NetMsg>| async move { selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; Ok(()) }; @@ -198,8 +201,8 @@ impl Discovery { async fn connect_loop(self: Arc<Self>) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { - let random_entry = self.random_entry(PENDING_ENTRY).await; - match random_entry { + let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; + match random_table_entry { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -218,7 +221,7 @@ impl Discovery { let selfc = self.clone(); let pid_c = pid.clone(); let endpoint_c = endpoint.clone(); - let cback = |conn: Conn| async move { + let cback = |conn: Conn<NetMsg>| async move { let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; // If the entry is not in the routing table, ignore the result @@ -230,17 +233,17 @@ impl Discovery { match result { Err(Error::IncompatiblePeer) => { error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; + selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; } Err(Error::PeerAlreadyConnected) => { - // TODO: Use the appropriate status. - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + // TODO: Use an appropriate status. + selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; } Err(_) => { - selfc.update_entry(&pid, UNSTABLE_ENTRY).await; + selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; } Ok(_) => { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; } } @@ -255,10 +258,10 @@ impl Discovery { if let Some(pid) = &pid { match result { Ok(_) => { - self.update_entry(pid, CONNECTED_ENTRY).await; + self.update_table_entry(pid, CONNECTED_ENTRY).await; } Err(_) => { - self.update_entry(pid, UNREACHABLE_ENTRY).await; + self.update_table_entry(pid, UNREACHABLE_ENTRY).await; } } } @@ -271,12 +274,16 @@ impl Discovery { /// table doesn't have an available entry, it will connect to one of the /// provided bootstrap endpoints in the `Config` and initiate the lookup. async fn start_seeding(&self) { - match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { + match self + .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) + .await + { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); let peer_id = Some(entry.key.into()); if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { - self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; + self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) + .await; error!("Failed to do lookup: {endpoint}: {err}"); } } @@ -292,12 +299,12 @@ impl Discovery { } /// Returns a random entry from routing table. - async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { + async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { self.table.lock().await.random_entry(entry_flag).cloned() } /// Update the entry status - async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { + async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { let table = &mut self.table.lock().await; table.update_entry(&pid.0, entry_flag); } |