From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- p2p/src/discovery/lookup.rs | 366 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 p2p/src/discovery/lookup.rs (limited to 'p2p/src/discovery/lookup.rs') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs new file mode 100644 index 0000000..f404133 --- /dev/null +++ b/p2p/src/discovery/lookup.rs @@ -0,0 +1,366 @@ +use std::{sync::Arc, time::Duration}; + +use futures_util::{stream::FuturesUnordered, StreamExt}; +use log::{error, trace}; +use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; +use smol::lock::{Mutex, RwLock}; + +use karyons_core::{async_utils::timeout, utils::decode, Executor}; + +use karyons_net::{Conn, Endpoint}; + +use crate::{ + io_codec::IOCodec, + message::{ + get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, + ShutdownMsg, + }, + monitor::{ConnEvent, DiscoveryEvent, Monitor}, + net::{ConnectionSlots, Connector, Listener}, + routing_table::RoutingTable, + utils::version_match, + Config, Error, PeerID, Result, +}; + +/// Maximum number of peers that can be returned in a PeersMsg. +pub const MAX_PEERS_IN_PEERSMSG: usize = 10; + +pub struct LookupService { + /// Peer's ID + id: PeerID, + + /// Routing Table + table: Arc>, + + /// Listener + listener: Arc, + /// Connector + connector: Arc, + + /// Outbound slots. + outbound_slots: Arc, + + /// Resolved listen endpoint + listen_endpoint: Option>, + + /// Holds the configuration for the P2P network. + config: Arc, + + /// Responsible for network and system monitoring. + monitor: Arc, +} + +impl LookupService { + /// Creates a new lookup service + pub fn new( + id: &PeerID, + table: Arc>, + config: Arc, + monitor: Arc, + ) -> Self { + let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); + let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); + + let listener = Listener::new(inbound_slots.clone(), monitor.clone()); + let connector = Connector::new( + config.lookup_connect_retries, + outbound_slots.clone(), + monitor.clone(), + ); + + let listen_endpoint = config + .listen_endpoint + .as_ref() + .map(|endpoint| RwLock::new(endpoint.clone())); + + Self { + id: id.clone(), + table, + listener, + connector, + outbound_slots, + listen_endpoint, + config, + monitor, + } + } + + /// Start the lookup service. + pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { + self.start_listener(ex).await?; + Ok(()) + } + + /// Set the resolved listen endpoint. + pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { + if let Some(endpoint) = &self.listen_endpoint { + *endpoint.write().await = resolved_endpoint.clone(); + } + } + + /// Shuts down the lookup service. + pub async fn shutdown(&self) { + self.connector.shutdown().await; + self.listener.shutdown().await; + } + + /// Starts iterative lookup and populate the routing table. + /// + /// This method begins by generating a random peer ID and connecting to the + /// provided endpoint. It then sends a FindPeer message containing the + /// randomly generated peer ID. Upon receiving peers from the initial lookup, + /// it starts connecting to these received peers and sends them a FindPeer + /// message that contains our own peer ID. + pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { + trace!("Lookup started {endpoint}"); + self.monitor + .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) + .await; + + let mut random_peers = vec![]; + if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { + self.monitor + .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) + .await; + return Err(err); + }; + + let mut peer_buffer = vec![]; + self.self_lookup(&random_peers, &mut peer_buffer).await; + + while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG { + match random_peers.pop() { + Some(p) => peer_buffer.push(p), + None => break, + } + } + + for peer in peer_buffer.iter() { + let mut table = self.table.lock().await; + let result = table.add_entry(peer.clone().into()); + trace!("Add entry {:?}", result); + } + + self.monitor + .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) + .await; + + Ok(()) + } + + /// Starts a random lookup + /// + /// This will perfom lookup on a random generated PeerID + async fn random_lookup( + &self, + endpoint: &Endpoint, + random_peers: &mut Vec, + ) -> Result<()> { + for _ in 0..2 { + let peer_id = PeerID::random(); + let peers = self.connect(&peer_id, endpoint.clone()).await?; + for peer in peers { + if random_peers.contains(&peer) + || peer.peer_id == self.id + || self.table.lock().await.contains_key(&peer.peer_id.0) + { + continue; + } + + random_peers.push(peer); + } + } + + Ok(()) + } + + /// Starts a self lookup + async fn self_lookup(&self, random_peers: &Vec, peer_buffer: &mut Vec) { + let mut tasks = FuturesUnordered::new(); + for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { + let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); + tasks.push(self.connect(&self.id, endpoint)) + } + + while let Some(result) = tasks.next().await { + match result { + Ok(peers) => peer_buffer.extend(peers), + Err(err) => { + error!("Failed to do self lookup: {err}"); + } + } + } + } + + /// Connects to the given endpoint + async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result> { + let conn = self.connector.connect(&endpoint).await?; + let io_codec = IOCodec::new(conn); + let result = self.handle_outbound(io_codec, peer_id).await; + + self.monitor + .notify(&ConnEvent::Disconnected(endpoint).into()) + .await; + self.outbound_slots.remove().await; + + result + } + + /// Handles outbound connection + async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result> { + trace!("Send Ping msg"); + self.send_ping_msg(&io_codec).await?; + + trace!("Send FindPeer msg"); + let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; + + if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { + return Err(Error::Lookup("Received too many peers in PeersMsg")); + } + + trace!("Send Peer msg"); + if let Some(endpoint) = &self.listen_endpoint { + self.send_peer_msg(&io_codec, endpoint.read().await.clone()) + .await?; + } + + trace!("Send Shutdown msg"); + self.send_shutdown_msg(&io_codec).await?; + + Ok(peers.0) + } + + /// Start a listener. + async fn start_listener(self: &Arc, ex: Executor<'_>) -> Result<()> { + let addr = match &self.listen_endpoint { + Some(a) => a.read().await.addr()?.clone(), + None => return Ok(()), + }; + + let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); + + let selfc = self.clone(); + let callback = |conn: Conn| async move { + let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); + timeout(t, selfc.handle_inbound(conn)).await??; + Ok(()) + }; + + self.listener.start(ex, endpoint.clone(), callback).await?; + Ok(()) + } + + /// Handles inbound connection + async fn handle_inbound(self: &Arc, conn: Conn) -> Result<()> { + let io_codec = IOCodec::new(conn); + loop { + let msg: NetMsg = io_codec.read().await?; + trace!("Receive msg {:?}", msg.header.command); + + if let NetMsgCmd::Shutdown = msg.header.command { + return Ok(()); + } + + match &msg.header.command { + NetMsgCmd::Ping => { + let (ping_msg, _) = decode::(&msg.payload)?; + if !version_match(&self.config.version.req, &ping_msg.version) { + return Err(Error::IncompatibleVersion("system: {}".into())); + } + self.send_pong_msg(ping_msg.nonce, &io_codec).await?; + } + NetMsgCmd::FindPeer => { + let (findpeer_msg, _) = decode::(&msg.payload)?; + let peer_id = findpeer_msg.0; + self.send_peers_msg(&peer_id, &io_codec).await?; + } + NetMsgCmd::Peer => { + let (peer, _) = decode::(&msg.payload)?; + let result = self.table.lock().await.add_entry(peer.clone().into()); + trace!("Add entry result: {:?}", result); + } + c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), + } + } + } + + /// Sends a Ping msg and wait to receive the Pong message. + async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { + trace!("Send Pong msg"); + + let mut nonce: [u8; 32] = [0; 32]; + RngCore::fill_bytes(&mut OsRng, &mut nonce); + + let ping_msg = PingMsg { + version: self.config.version.v.clone(), + nonce, + }; + io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; + + let t = Duration::from_secs(self.config.lookup_response_timeout); + let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + + let payload = get_msg_payload!(Pong, recv_msg); + let (pong_msg, _) = decode::(&payload)?; + + if ping_msg.nonce != pong_msg.0 { + return Err(Error::InvalidPongMsg); + } + + Ok(()) + } + + /// Sends a Pong msg + async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { + trace!("Send Pong msg"); + io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; + Ok(()) + } + + /// Sends a FindPeer msg and wait to receivet the Peers msg. + async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result { + trace!("Send FindPeer msg"); + io_codec + .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) + .await?; + + let t = Duration::from_secs(self.config.lookup_response_timeout); + let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + + let payload = get_msg_payload!(Peers, recv_msg); + let (peers, _) = decode(&payload)?; + + Ok(peers) + } + + /// Sends a Peers msg. + async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { + trace!("Send Peers msg"); + let table = self.table.lock().await; + let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); + let peers: Vec = entries.into_iter().map(|e| e.into()).collect(); + drop(table); + io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; + Ok(()) + } + + /// Sends a Peer msg. + async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { + trace!("Send Peer msg"); + let peer_msg = PeerMsg { + addr: endpoint.addr()?.clone(), + port: *endpoint.port()?, + discovery_port: self.config.discovery_port, + peer_id: self.id.clone(), + }; + io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; + Ok(()) + } + + /// Sends a Shutdown msg. + async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { + trace!("Send Shutdown msg"); + io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; + Ok(()) + } +} -- cgit v1.2.3