aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-04-11 10:19:20 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-19 13:51:30 +0200
commit0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch)
tree961d73218af672797d49f899289bef295bc56493 /p2p/src/discovery
parenta69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff)
add support for tokio & improve net crate api
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/lookup.rs71
-rw-r--r--p2p/src/discovery/mod.rs45
-rw-r--r--p2p/src/discovery/refresh.rs92
3 files changed, 109 insertions, 99 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index c81fbc6..cff4610 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -3,10 +3,13 @@ use std::{sync::Arc, time::Duration};
use futures_util::{stream::FuturesUnordered, StreamExt};
use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
-use smol::lock::{Mutex, RwLock};
use karyon_core::{
- async_util::{timeout, Executor},
+ async_runtime::{
+ lock::{Mutex, RwLock},
+ Executor,
+ },
+ async_util::timeout,
crypto::KeyPair,
util::decode,
};
@@ -14,7 +17,6 @@ use karyon_core::{
use karyon_net::{Conn, Endpoint};
use crate::{
- codec::Codec,
connector::Connector,
listener::Listener,
message::{
@@ -64,7 +66,7 @@ impl LookupService {
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: Executor<'static>,
+ ex: Executor,
) -> Self {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
@@ -228,8 +230,7 @@ impl LookupService {
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
- let io_codec = Codec::new(conn);
- let result = self.handle_outbound(io_codec, target_peer_id).await;
+ let result = self.handle_outbound(conn, target_peer_id).await;
self.monitor
.notify(&ConnEvent::Disconnected(endpoint).into())
@@ -242,14 +243,14 @@ impl LookupService {
/// Handles outbound connection
async fn handle_outbound(
&self,
- io_codec: Codec,
+ conn: Conn<NetMsg>,
target_peer_id: &PeerID,
) -> Result<Vec<PeerMsg>> {
trace!("Send Ping msg");
- self.send_ping_msg(&io_codec).await?;
+ self.send_ping_msg(&conn).await?;
trace!("Send FindPeer msg");
- let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?;
+ let peers = self.send_findpeer_msg(&conn, target_peer_id).await?;
if peers.0.len() >= MAX_PEERS_IN_PEERSMSG {
return Err(Error::Lookup("Received too many peers in PeersMsg"));
@@ -257,12 +258,12 @@ impl LookupService {
trace!("Send Peer msg");
if let Some(endpoint) = &self.listen_endpoint {
- self.send_peer_msg(&io_codec, endpoint.read().await.clone())
+ self.send_peer_msg(&conn, endpoint.read().await.clone())
.await?;
}
trace!("Send Shutdown msg");
- self.send_shutdown_msg(&io_codec).await?;
+ self.send_shutdown_msg(&conn).await?;
Ok(peers.0)
}
@@ -277,7 +278,7 @@ impl LookupService {
let endpoint = Endpoint::Tcp(addr, self.config.discovery_port);
let selfc = self.clone();
- let callback = |conn: Conn| async move {
+ let callback = |conn: Conn<NetMsg>| async move {
let t = Duration::from_secs(selfc.config.lookup_connection_lifespan);
timeout(t, selfc.handle_inbound(conn)).await??;
Ok(())
@@ -288,10 +289,9 @@ impl LookupService {
}
/// Handles inbound connection
- async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> {
- let io_codec = Codec::new(conn);
+ async fn handle_inbound(self: &Arc<Self>, conn: Conn<NetMsg>) -> Result<()> {
loop {
- let msg: NetMsg = io_codec.read().await?;
+ let msg: NetMsg = conn.recv().await?;
trace!("Receive msg {:?}", msg.header.command);
if let NetMsgCmd::Shutdown = msg.header.command {
@@ -304,12 +304,12 @@ impl LookupService {
if !version_match(&self.config.version.req, &ping_msg.version) {
return Err(Error::IncompatibleVersion("system: {}".into()));
}
- self.send_pong_msg(ping_msg.nonce, &io_codec).await?;
+ self.send_pong_msg(ping_msg.nonce, &conn).await?;
}
NetMsgCmd::FindPeer => {
let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
let peer_id = findpeer_msg.0;
- self.send_peers_msg(&peer_id, &io_codec).await?;
+ self.send_peers_msg(&peer_id, &conn).await?;
}
NetMsgCmd::Peer => {
let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
@@ -322,7 +322,7 @@ impl LookupService {
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> {
+ async fn send_ping_msg(&self, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Pong msg");
let mut nonce: [u8; 32] = [0; 32];
@@ -332,10 +332,10 @@ impl LookupService {
version: self.config.version.v.clone(),
nonce,
};
- io_codec.write(NetMsgCmd::Ping, &ping_msg).await?;
+ conn.send(NetMsg::new(NetMsgCmd::Ping, &ping_msg)?).await?;
let t = Duration::from_secs(self.config.lookup_response_timeout);
- let recv_msg: NetMsg = io_codec.read_timeout(t).await?;
+ let recv_msg: NetMsg = timeout(t, conn.recv()).await??;
let payload = get_msg_payload!(Pong, recv_msg);
let (pong_msg, _) = decode::<PongMsg>(&payload)?;
@@ -348,21 +348,24 @@ impl LookupService {
}
/// Sends a Pong msg
- async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> {
+ async fn send_pong_msg(&self, nonce: [u8; 32], conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Pong msg");
- io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?;
+ conn.send(NetMsg::new(NetMsgCmd::Pong, &PongMsg(nonce))?)
+ .await?;
Ok(())
}
/// Sends a FindPeer msg and wait to receivet the Peers msg.
- async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> {
+ async fn send_findpeer_msg(&self, conn: &Conn<NetMsg>, peer_id: &PeerID) -> Result<PeersMsg> {
trace!("Send FindPeer msg");
- io_codec
- .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone()))
- .await?;
+ conn.send(NetMsg::new(
+ NetMsgCmd::FindPeer,
+ &FindPeerMsg(peer_id.clone()),
+ )?)
+ .await?;
let t = Duration::from_secs(self.config.lookup_response_timeout);
- let recv_msg: NetMsg = io_codec.read_timeout(t).await?;
+ let recv_msg: NetMsg = timeout(t, conn.recv()).await??;
let payload = get_msg_payload!(Peers, recv_msg);
let (peers, _) = decode(&payload)?;
@@ -371,19 +374,20 @@ impl LookupService {
}
/// Sends a Peers msg.
- async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> {
+ async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Peers msg");
let table = self.table.lock().await;
let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
drop(table);
let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
- io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?;
+ conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?)
+ .await?;
Ok(())
}
/// Sends a Peer msg.
- async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> {
+ async fn send_peer_msg(&self, conn: &Conn<NetMsg>, endpoint: Endpoint) -> Result<()> {
trace!("Send Peer msg");
let peer_msg = PeerMsg {
addr: endpoint.addr()?.clone(),
@@ -391,14 +395,15 @@ impl LookupService {
discovery_port: self.config.discovery_port,
peer_id: self.id.clone(),
};
- io_codec.write(NetMsgCmd::Peer, &peer_msg).await?;
+ conn.send(NetMsg::new(NetMsgCmd::Peer, &peer_msg)?).await?;
Ok(())
}
/// Sends a Shutdown msg.
- async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> {
+ async fn send_shutdown_msg(&self, conn: &Conn<NetMsg>) -> Result<()> {
trace!("Send Shutdown msg");
- io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?;
+ conn.send(NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0))?)
+ .await?;
Ok(())
}
}
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 3e437aa..19ae77a 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -5,10 +5,10 @@ use std::sync::Arc;
use log::{error, info};
use rand::{rngs::OsRng, seq::SliceRandom};
-use smol::lock::Mutex;
use karyon_core::{
- async_util::{Backoff, Executor, TaskGroup, TaskResult},
+ async_runtime::{lock::Mutex, Executor},
+ async_util::{Backoff, TaskGroup, TaskResult},
crypto::KeyPair,
};
@@ -19,6 +19,7 @@ use crate::{
connection::{ConnDirection, ConnQueue},
connector::Connector,
listener::Listener,
+ message::NetMsg,
monitor::Monitor,
routing_table::{
Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY,
@@ -45,6 +46,7 @@ pub struct Discovery {
/// Connector
connector: Arc<Connector>,
+
/// Listener
listener: Arc<Listener>,
@@ -53,11 +55,12 @@ pub struct Discovery {
/// Inbound slots.
pub(crate) inbound_slots: Arc<ConnectionSlots>,
+
/// Outbound slots.
pub(crate) outbound_slots: Arc<ConnectionSlots>,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -71,7 +74,7 @@ impl Discovery {
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
monitor: Arc<Monitor>,
- ex: Executor<'static>,
+ ex: Executor,
) -> ArcDiscovery {
let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
@@ -180,7 +183,7 @@ impl Discovery {
/// Start a listener and on success, return the resolved endpoint.
async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
let selfc = self.clone();
- let callback = |c: Conn| async move {
+ let callback = |c: Conn<NetMsg>| async move {
selfc.conn_queue.handle(c, ConnDirection::Inbound).await?;
Ok(())
};
@@ -198,8 +201,8 @@ impl Discovery {
async fn connect_loop(self: Arc<Self>) -> Result<()> {
let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
loop {
- let random_entry = self.random_entry(PENDING_ENTRY).await;
- match random_entry {
+ let random_table_entry = self.random_table_entry(PENDING_ENTRY).await;
+ match random_table_entry {
Some(entry) => {
backoff.reset();
let endpoint = Endpoint::Tcp(entry.addr, entry.port);
@@ -218,7 +221,7 @@ impl Discovery {
let selfc = self.clone();
let pid_c = pid.clone();
let endpoint_c = endpoint.clone();
- let cback = |conn: Conn| async move {
+ let cback = |conn: Conn<NetMsg>| async move {
let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await;
// If the entry is not in the routing table, ignore the result
@@ -230,17 +233,17 @@ impl Discovery {
match result {
Err(Error::IncompatiblePeer) => {
error!("Failed to do handshake: {endpoint_c} incompatible peer");
- selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
+ selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
- // TODO: Use the appropriate status.
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ // TODO: Use an appropriate status.
+ selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
- selfc.update_entry(&pid, UNSTABLE_ENTRY).await;
+ selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await;
}
Ok(_) => {
- selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
+ selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await;
}
}
@@ -255,10 +258,10 @@ impl Discovery {
if let Some(pid) = &pid {
match result {
Ok(_) => {
- self.update_entry(pid, CONNECTED_ENTRY).await;
+ self.update_table_entry(pid, CONNECTED_ENTRY).await;
}
Err(_) => {
- self.update_entry(pid, UNREACHABLE_ENTRY).await;
+ self.update_table_entry(pid, UNREACHABLE_ENTRY).await;
}
}
}
@@ -271,12 +274,16 @@ impl Discovery {
/// table doesn't have an available entry, it will connect to one of the
/// provided bootstrap endpoints in the `Config` and initiate the lookup.
async fn start_seeding(&self) {
- match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
+ match self
+ .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY)
+ .await
+ {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
let peer_id = Some(entry.key.into());
if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
- self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
+ self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY)
+ .await;
error!("Failed to do lookup: {endpoint}: {err}");
}
}
@@ -292,12 +299,12 @@ impl Discovery {
}
/// Returns a random entry from routing table.
- async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
+ async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
self.table.lock().await.random_entry(entry_flag).cloned()
}
/// Update the entry status
- async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
+ async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) {
let table = &mut self.table.lock().await;
table.update_entry(&pid.0, entry_flag);
}
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 035a581..0c49ac2 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -3,31 +3,28 @@ use std::{sync::Arc, time::Duration};
use bincode::{Decode, Encode};
use log::{error, info, trace};
use rand::{rngs::OsRng, RngCore};
-use smol::{
- lock::{Mutex, RwLock},
- stream::StreamExt,
- Timer,
-};
use karyon_core::{
- async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult},
- util::{decode, encode},
+ async_runtime::{
+ lock::{Mutex, RwLock},
+ Executor,
+ },
+ async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult},
};
-use karyon_net::{udp, Connection, Endpoint, NetError};
-
-/// Maximum failures for an entry before removing it from the routing table.
-pub const MAX_FAILURES: u32 = 3;
-
-/// Ping message size
-const PINGMSG_SIZE: usize = 32;
+use karyon_net::{udp, Connection, Endpoint, Error as NetError};
use crate::{
+ codec::RefreshMsgCodec,
+ message::RefreshMsg,
monitor::{ConnEvent, DiscoveryEvent, Monitor},
routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY},
Config, Error, Result,
};
+/// Maximum failures for an entry before removing it from the routing table.
+pub const MAX_FAILURES: u32 = 3;
+
#[derive(Decode, Encode, Debug, Clone)]
pub struct PingMsg(pub [u8; 32]);
@@ -42,10 +39,10 @@ pub struct RefreshService {
listen_endpoint: Option<RwLock<Endpoint>>,
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
/// A global executor
- executor: Executor<'static>,
+ executor: Executor,
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -60,7 +57,7 @@ impl RefreshService {
config: Arc<Config>,
table: Arc<Mutex<RoutingTable>>,
monitor: Arc<Monitor>,
- executor: Executor<'static>,
+ executor: Executor,
) -> Self {
let listen_endpoint = config
.listen_endpoint
@@ -118,9 +115,8 @@ impl RefreshService {
/// selects the first 8 entries (oldest entries) from each bucket in the
/// routing table and starts sending Ping messages to the collected entries.
async fn refresh_loop(self: Arc<Self>) -> Result<()> {
- let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval));
loop {
- timer.next().await;
+ sleep(Duration::from_secs(self.config.refresh_interval)).await;
trace!("Start refreshing the routing table...");
self.monitor
@@ -162,7 +158,7 @@ impl RefreshService {
}
for task in tasks {
- task.await;
+ let _ = task.await;
}
}
}
@@ -193,10 +189,10 @@ impl RefreshService {
async fn connect(&self, entry: &Entry) -> Result<()> {
let mut retry = 0;
let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port);
- let conn = udp::dial(&endpoint).await?;
+ let conn = udp::dial(&endpoint, Default::default(), RefreshMsgCodec {}).await?;
let backoff = Backoff::new(100, 5000);
while retry < self.config.refresh_connect_retries {
- match self.send_ping_msg(&conn).await {
+ match self.send_ping_msg(&conn, &endpoint).await {
Ok(()) => return Ok(()),
Err(Error::KaryonNet(NetError::Timeout)) => {
retry += 1;
@@ -214,7 +210,7 @@ impl RefreshService {
/// Set up a UDP listener and start listening for Ping messages from other
/// peers.
async fn listen_loop(self: Arc<Self>, endpoint: Endpoint) -> Result<()> {
- let conn = match udp::listen(&endpoint).await {
+ let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {
Ok(c) => {
self.monitor
.notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -240,46 +236,48 @@ impl RefreshService {
}
/// Listen to receive a Ping message and respond with a Pong message.
- async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
- let mut buf = [0; PINGMSG_SIZE];
- let (_, endpoint) = conn.recv_from(&mut buf).await?;
-
+ async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {
+ let (msg, endpoint) = conn.recv().await?;
self.monitor
.notify(&ConnEvent::Accepted(endpoint.clone()).into())
.await;
- let (ping_msg, _) = decode::<PingMsg>(&buf)?;
-
- let pong_msg = PongMsg(ping_msg.0);
- let buffer = encode(&pong_msg)?;
-
- conn.send_to(&buffer, &endpoint).await?;
+ match msg {
+ RefreshMsg::Ping(m) => {
+ let pong_msg = RefreshMsg::Pong(m);
+ conn.send((pong_msg, endpoint.clone())).await?;
+ }
+ RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())),
+ }
self.monitor
- .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
+ .notify(&ConnEvent::Disconnected(endpoint).into())
.await;
Ok(())
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> {
+ async fn send_ping_msg(
+ &self,
+ conn: &udp::UdpConn<RefreshMsgCodec>,
+ endpoint: &Endpoint,
+ ) -> Result<()> {
let mut nonce: [u8; 32] = [0; 32];
RngCore::fill_bytes(&mut OsRng, &mut nonce);
+ conn.send((RefreshMsg::Ping(nonce), endpoint.clone()))
+ .await?;
- let ping_msg = PingMsg(nonce);
- let buffer = encode(&ping_msg)?;
- conn.write(&buffer).await?;
-
- let buf = &mut [0; PINGMSG_SIZE];
let t = Duration::from_secs(self.config.refresh_response_timeout);
- timeout(t, conn.read(buf)).await??;
-
- let (pong_msg, _) = decode::<PongMsg>(buf)?;
+ let (msg, _) = timeout(t, conn.recv()).await??;
- if ping_msg.0 != pong_msg.0 {
- return Err(Error::InvalidPongMsg);
+ match msg {
+ RefreshMsg::Pong(n) => {
+ if n != nonce {
+ return Err(Error::InvalidPongMsg);
+ }
+ Ok(())
+ }
+ _ => Err(Error::InvalidMsg("Unexpected ping msg".into())),
}
-
- Ok(())
}
}