From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- karyons_p2p/src/backend.rs | 139 ---------- karyons_p2p/src/config.rs | 105 -------- karyons_p2p/src/discovery/lookup.rs | 366 ------------------------- karyons_p2p/src/discovery/mod.rs | 262 ------------------ karyons_p2p/src/discovery/refresh.rs | 289 -------------------- karyons_p2p/src/error.rs | 82 ------ karyons_p2p/src/io_codec.rs | 102 ------- karyons_p2p/src/lib.rs | 27 -- karyons_p2p/src/message.rs | 133 --------- karyons_p2p/src/monitor.rs | 154 ----------- karyons_p2p/src/net/connection_queue.rs | 52 ---- karyons_p2p/src/net/connector.rs | 125 --------- karyons_p2p/src/net/listener.rs | 142 ---------- karyons_p2p/src/net/mod.rs | 27 -- karyons_p2p/src/net/slots.rs | 54 ---- karyons_p2p/src/peer/mod.rs | 237 ---------------- karyons_p2p/src/peer/peer_id.rs | 41 --- karyons_p2p/src/peer_pool.rs | 337 ----------------------- karyons_p2p/src/protocol.rs | 113 -------- karyons_p2p/src/protocols/mod.rs | 3 - karyons_p2p/src/protocols/ping.rs | 173 ------------ karyons_p2p/src/routing_table/bucket.rs | 123 --------- karyons_p2p/src/routing_table/entry.rs | 41 --- karyons_p2p/src/routing_table/mod.rs | 461 -------------------------------- karyons_p2p/src/utils/mod.rs | 21 -- karyons_p2p/src/utils/version.rs | 93 ------- 26 files changed, 3702 deletions(-) delete mode 100644 karyons_p2p/src/backend.rs delete mode 100644 karyons_p2p/src/config.rs delete mode 100644 karyons_p2p/src/discovery/lookup.rs delete mode 100644 karyons_p2p/src/discovery/mod.rs delete mode 100644 karyons_p2p/src/discovery/refresh.rs delete mode 100644 karyons_p2p/src/error.rs delete mode 100644 karyons_p2p/src/io_codec.rs delete mode 100644 karyons_p2p/src/lib.rs delete mode 100644 karyons_p2p/src/message.rs delete mode 100644 karyons_p2p/src/monitor.rs delete mode 100644 karyons_p2p/src/net/connection_queue.rs delete mode 100644 karyons_p2p/src/net/connector.rs delete mode 100644 karyons_p2p/src/net/listener.rs delete mode 100644 karyons_p2p/src/net/mod.rs delete mode 100644 karyons_p2p/src/net/slots.rs delete mode 100644 karyons_p2p/src/peer/mod.rs delete mode 100644 karyons_p2p/src/peer/peer_id.rs delete mode 100644 karyons_p2p/src/peer_pool.rs delete mode 100644 karyons_p2p/src/protocol.rs delete mode 100644 karyons_p2p/src/protocols/mod.rs delete mode 100644 karyons_p2p/src/protocols/ping.rs delete mode 100644 karyons_p2p/src/routing_table/bucket.rs delete mode 100644 karyons_p2p/src/routing_table/entry.rs delete mode 100644 karyons_p2p/src/routing_table/mod.rs delete mode 100644 karyons_p2p/src/utils/mod.rs delete mode 100644 karyons_p2p/src/utils/version.rs (limited to 'karyons_p2p/src') diff --git a/karyons_p2p/src/backend.rs b/karyons_p2p/src/backend.rs deleted file mode 100644 index 290e3e7..0000000 --- a/karyons_p2p/src/backend.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::sync::Arc; - -use log::info; - -use karyons_core::{pubsub::Subscription, Executor}; - -use crate::{ - config::Config, - discovery::{ArcDiscovery, Discovery}, - monitor::{Monitor, MonitorEvent}, - net::ConnQueue, - peer_pool::PeerPool, - protocol::{ArcProtocol, Protocol}, - ArcPeer, PeerID, Result, -}; - -pub type ArcBackend = Arc; - -/// Backend serves as the central entry point for initiating and managing -/// the P2P network. -/// -/// -/// # Example -/// ``` -/// use std::sync::Arc; -/// -/// use easy_parallel::Parallel; -/// use smol::{channel as smol_channel, future, Executor}; -/// -/// use karyons_p2p::{Backend, Config, PeerID}; -/// -/// let peer_id = PeerID::random(); -/// -/// // Create the configuration for the backend. -/// let mut config = Config::default(); -/// -/// // Create a new Backend -/// let backend = Backend::new(peer_id, config); -/// -/// // Create a new Executor -/// let ex = Arc::new(Executor::new()); -/// -/// let task = async { -/// // Run the backend -/// backend.run(ex.clone()).await.unwrap(); -/// -/// // .... -/// -/// // Shutdown the backend -/// backend.shutdown().await; -/// }; -/// -/// future::block_on(ex.run(task)); -/// -/// ``` -pub struct Backend { - /// The Configuration for the P2P network. - config: Arc, - - /// Peer ID. - id: PeerID, - - /// Responsible for network and system monitoring. - monitor: Arc, - - /// Discovery instance. - discovery: ArcDiscovery, - - /// PeerPool instance. - peer_pool: Arc, -} - -impl Backend { - /// Creates a new Backend. - pub fn new(id: PeerID, config: Config) -> ArcBackend { - let config = Arc::new(config); - let monitor = Arc::new(Monitor::new()); - - let conn_queue = ConnQueue::new(); - - let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone()); - let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone()); - - Arc::new(Self { - id: id.clone(), - monitor, - discovery, - config, - peer_pool, - }) - } - - /// Run the Backend, starting the PeerPool and Discovery instances. - pub async fn run(self: &Arc, ex: Executor<'_>) -> Result<()> { - info!("Run the backend {}", self.id); - self.peer_pool.start(ex.clone()).await?; - self.discovery.start(ex.clone()).await?; - Ok(()) - } - - /// Attach a custom protocol to the network - pub async fn attach_protocol( - &self, - c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static, - ) -> Result<()> { - self.peer_pool.attach_protocol::

(Box::new(c)).await - } - - /// Returns the number of currently connected peers. - pub async fn peers(&self) -> usize { - self.peer_pool.peers_len().await - } - - /// Returns the `Config`. - pub fn config(&self) -> Arc { - self.config.clone() - } - - /// Returns the number of occupied inbound slots. - pub fn inbound_slots(&self) -> usize { - self.discovery.inbound_slots.load() - } - - /// Returns the number of occupied outbound slots. - pub fn outbound_slots(&self) -> usize { - self.discovery.outbound_slots.load() - } - - /// Subscribes to the monitor to receive network events. - pub async fn monitor(&self) -> Subscription { - self.monitor.subscribe().await - } - - /// Shuts down the Backend. - pub async fn shutdown(&self) { - self.discovery.shutdown().await; - self.peer_pool.shutdown().await; - } -} diff --git a/karyons_p2p/src/config.rs b/karyons_p2p/src/config.rs deleted file mode 100644 index ebecbf0..0000000 --- a/karyons_p2p/src/config.rs +++ /dev/null @@ -1,105 +0,0 @@ -use karyons_net::{Endpoint, Port}; - -use crate::utils::Version; - -/// the Configuration for the P2P network. -pub struct Config { - /// Represents the network version. - pub version: Version, - - ///////////////// - // PeerPool - //////////////// - /// Timeout duration for the handshake with new peers, in seconds. - pub handshake_timeout: u64, - /// Interval at which the ping protocol sends ping messages to a peer to - /// maintain connections, in seconds. - pub ping_interval: u64, - /// Timeout duration for receiving the pong message corresponding to the - /// sent ping message, in seconds. - pub ping_timeout: u64, - /// The maximum number of retries for outbound connection establishment. - pub max_connect_retries: usize, - - ///////////////// - // DISCOVERY - //////////////// - /// A list of bootstrap peers for the seeding process. - pub bootstrap_peers: Vec, - /// An optional listening endpoint to accept incoming connections. - pub listen_endpoint: Option, - /// A list of endpoints representing peers that the `Discovery` will - /// manually connect to. - pub peer_endpoints: Vec, - /// The number of available inbound slots for incoming connections. - pub inbound_slots: usize, - /// The number of available outbound slots for outgoing connections. - pub outbound_slots: usize, - /// TCP/UDP port for lookup and refresh processes. - pub discovery_port: Port, - /// Time interval, in seconds, at which the Discovery restarts the - /// seeding process. - pub seeding_interval: u64, - - ///////////////// - // LOOKUP - //////////////// - /// The number of available inbound slots for incoming connections during - /// the lookup process. - pub lookup_inbound_slots: usize, - /// The number of available outbound slots for outgoing connections during - /// the lookup process. - pub lookup_outbound_slots: usize, - /// Timeout duration for a peer response during the lookup process, in - /// seconds. - pub lookup_response_timeout: u64, - /// Maximum allowable time for a live connection with a peer during the - /// lookup process, in seconds. - pub lookup_connection_lifespan: u64, - /// The maximum number of retries for outbound connection establishment - /// during the lookup process. - pub lookup_connect_retries: usize, - - ///////////////// - // REFRESH - //////////////// - /// Interval at which the table refreshes its entries, in seconds. - pub refresh_interval: u64, - /// Timeout duration for a peer response during the table refresh process, - /// in seconds. - pub refresh_response_timeout: u64, - /// The maximum number of retries for outbound connection establishment - /// during the refresh process. - pub refresh_connect_retries: usize, -} - -impl Default for Config { - fn default() -> Self { - Config { - version: "0.1.0".parse().unwrap(), - - handshake_timeout: 2, - ping_interval: 20, - ping_timeout: 2, - - bootstrap_peers: vec![], - listen_endpoint: None, - peer_endpoints: vec![], - inbound_slots: 12, - outbound_slots: 12, - max_connect_retries: 3, - discovery_port: 0, - seeding_interval: 60, - - lookup_inbound_slots: 20, - lookup_outbound_slots: 20, - lookup_response_timeout: 1, - lookup_connection_lifespan: 3, - lookup_connect_retries: 3, - - refresh_interval: 1800, - refresh_response_timeout: 1, - refresh_connect_retries: 3, - } - } -} diff --git a/karyons_p2p/src/discovery/lookup.rs b/karyons_p2p/src/discovery/lookup.rs deleted file mode 100644 index f404133..0000000 --- a/karyons_p2p/src/discovery/lookup.rs +++ /dev/null @@ -1,366 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use futures_util::{stream::FuturesUnordered, StreamExt}; -use log::{error, trace}; -use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; -use smol::lock::{Mutex, RwLock}; - -use karyons_core::{async_utils::timeout, utils::decode, Executor}; - -use karyons_net::{Conn, Endpoint}; - -use crate::{ - io_codec::IOCodec, - message::{ - get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, - ShutdownMsg, - }, - monitor::{ConnEvent, DiscoveryEvent, Monitor}, - net::{ConnectionSlots, Connector, Listener}, - routing_table::RoutingTable, - utils::version_match, - Config, Error, PeerID, Result, -}; - -/// Maximum number of peers that can be returned in a PeersMsg. -pub const MAX_PEERS_IN_PEERSMSG: usize = 10; - -pub struct LookupService { - /// Peer's ID - id: PeerID, - - /// Routing Table - table: Arc>, - - /// Listener - listener: Arc, - /// Connector - connector: Arc, - - /// Outbound slots. - outbound_slots: Arc, - - /// Resolved listen endpoint - listen_endpoint: Option>, - - /// Holds the configuration for the P2P network. - config: Arc, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl LookupService { - /// Creates a new lookup service - pub fn new( - id: &PeerID, - table: Arc>, - config: Arc, - monitor: Arc, - ) -> Self { - let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); - let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); - let connector = Connector::new( - config.lookup_connect_retries, - outbound_slots.clone(), - monitor.clone(), - ); - - let listen_endpoint = config - .listen_endpoint - .as_ref() - .map(|endpoint| RwLock::new(endpoint.clone())); - - Self { - id: id.clone(), - table, - listener, - connector, - outbound_slots, - listen_endpoint, - config, - monitor, - } - } - - /// Start the lookup service. - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { - self.start_listener(ex).await?; - Ok(()) - } - - /// Set the resolved listen endpoint. - pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { - if let Some(endpoint) = &self.listen_endpoint { - *endpoint.write().await = resolved_endpoint.clone(); - } - } - - /// Shuts down the lookup service. - pub async fn shutdown(&self) { - self.connector.shutdown().await; - self.listener.shutdown().await; - } - - /// Starts iterative lookup and populate the routing table. - /// - /// This method begins by generating a random peer ID and connecting to the - /// provided endpoint. It then sends a FindPeer message containing the - /// randomly generated peer ID. Upon receiving peers from the initial lookup, - /// it starts connecting to these received peers and sends them a FindPeer - /// message that contains our own peer ID. - pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { - trace!("Lookup started {endpoint}"); - self.monitor - .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) - .await; - - let mut random_peers = vec![]; - if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { - self.monitor - .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) - .await; - return Err(err); - }; - - let mut peer_buffer = vec![]; - self.self_lookup(&random_peers, &mut peer_buffer).await; - - while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG { - match random_peers.pop() { - Some(p) => peer_buffer.push(p), - None => break, - } - } - - for peer in peer_buffer.iter() { - let mut table = self.table.lock().await; - let result = table.add_entry(peer.clone().into()); - trace!("Add entry {:?}", result); - } - - self.monitor - .notify(&DiscoveryEvent::LookupSucceeded(endpoint.clone(), peer_buffer.len()).into()) - .await; - - Ok(()) - } - - /// Starts a random lookup - /// - /// This will perfom lookup on a random generated PeerID - async fn random_lookup( - &self, - endpoint: &Endpoint, - random_peers: &mut Vec, - ) -> Result<()> { - for _ in 0..2 { - let peer_id = PeerID::random(); - let peers = self.connect(&peer_id, endpoint.clone()).await?; - for peer in peers { - if random_peers.contains(&peer) - || peer.peer_id == self.id - || self.table.lock().await.contains_key(&peer.peer_id.0) - { - continue; - } - - random_peers.push(peer); - } - } - - Ok(()) - } - - /// Starts a self lookup - async fn self_lookup(&self, random_peers: &Vec, peer_buffer: &mut Vec) { - let mut tasks = FuturesUnordered::new(); - for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { - let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); - tasks.push(self.connect(&self.id, endpoint)) - } - - while let Some(result) = tasks.next().await { - match result { - Ok(peers) => peer_buffer.extend(peers), - Err(err) => { - error!("Failed to do self lookup: {err}"); - } - } - } - } - - /// Connects to the given endpoint - async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result> { - let conn = self.connector.connect(&endpoint).await?; - let io_codec = IOCodec::new(conn); - let result = self.handle_outbound(io_codec, peer_id).await; - - self.monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) - .await; - self.outbound_slots.remove().await; - - result - } - - /// Handles outbound connection - async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result> { - trace!("Send Ping msg"); - self.send_ping_msg(&io_codec).await?; - - trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; - - if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { - return Err(Error::Lookup("Received too many peers in PeersMsg")); - } - - trace!("Send Peer msg"); - if let Some(endpoint) = &self.listen_endpoint { - self.send_peer_msg(&io_codec, endpoint.read().await.clone()) - .await?; - } - - trace!("Send Shutdown msg"); - self.send_shutdown_msg(&io_codec).await?; - - Ok(peers.0) - } - - /// Start a listener. - async fn start_listener(self: &Arc, ex: Executor<'_>) -> Result<()> { - let addr = match &self.listen_endpoint { - Some(a) => a.read().await.addr()?.clone(), - None => return Ok(()), - }; - - let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); - - let selfc = self.clone(); - let callback = |conn: Conn| async move { - let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); - timeout(t, selfc.handle_inbound(conn)).await??; - Ok(()) - }; - - self.listener.start(ex, endpoint.clone(), callback).await?; - Ok(()) - } - - /// Handles inbound connection - async fn handle_inbound(self: &Arc, conn: Conn) -> Result<()> { - let io_codec = IOCodec::new(conn); - loop { - let msg: NetMsg = io_codec.read().await?; - trace!("Receive msg {:?}", msg.header.command); - - if let NetMsgCmd::Shutdown = msg.header.command { - return Ok(()); - } - - match &msg.header.command { - NetMsgCmd::Ping => { - let (ping_msg, _) = decode::(&msg.payload)?; - if !version_match(&self.config.version.req, &ping_msg.version) { - return Err(Error::IncompatibleVersion("system: {}".into())); - } - self.send_pong_msg(ping_msg.nonce, &io_codec).await?; - } - NetMsgCmd::FindPeer => { - let (findpeer_msg, _) = decode::(&msg.payload)?; - let peer_id = findpeer_msg.0; - self.send_peers_msg(&peer_id, &io_codec).await?; - } - NetMsgCmd::Peer => { - let (peer, _) = decode::(&msg.payload)?; - let result = self.table.lock().await.add_entry(peer.clone().into()); - trace!("Add entry result: {:?}", result); - } - c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))), - } - } - } - - /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { - trace!("Send Pong msg"); - - let mut nonce: [u8; 32] = [0; 32]; - RngCore::fill_bytes(&mut OsRng, &mut nonce); - - let ping_msg = PingMsg { - version: self.config.version.v.clone(), - nonce, - }; - io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; - - let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; - - let payload = get_msg_payload!(Pong, recv_msg); - let (pong_msg, _) = decode::(&payload)?; - - if ping_msg.nonce != pong_msg.0 { - return Err(Error::InvalidPongMsg); - } - - Ok(()) - } - - /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { - trace!("Send Pong msg"); - io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; - Ok(()) - } - - /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result { - trace!("Send FindPeer msg"); - io_codec - .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) - .await?; - - let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; - - let payload = get_msg_payload!(Peers, recv_msg); - let (peers, _) = decode(&payload)?; - - Ok(peers) - } - - /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { - trace!("Send Peers msg"); - let table = self.table.lock().await; - let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); - let peers: Vec = entries.into_iter().map(|e| e.into()).collect(); - drop(table); - io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; - Ok(()) - } - - /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { - trace!("Send Peer msg"); - let peer_msg = PeerMsg { - addr: endpoint.addr()?.clone(), - port: *endpoint.port()?, - discovery_port: self.config.discovery_port, - peer_id: self.id.clone(), - }; - io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; - Ok(()) - } - - /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { - trace!("Send Shutdown msg"); - io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; - Ok(()) - } -} diff --git a/karyons_p2p/src/discovery/mod.rs b/karyons_p2p/src/discovery/mod.rs deleted file mode 100644 index 94b350b..0000000 --- a/karyons_p2p/src/discovery/mod.rs +++ /dev/null @@ -1,262 +0,0 @@ -mod lookup; -mod refresh; - -use std::sync::Arc; - -use log::{error, info}; -use rand::{rngs::OsRng, seq::SliceRandom}; -use smol::lock::Mutex; - -use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, -}; - -use karyons_net::{Conn, Endpoint}; - -use crate::{ - config::Config, - monitor::Monitor, - net::ConnQueue, - net::{ConnDirection, ConnectionSlots, Connector, Listener}, - routing_table::{ - Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, - }, - Error, PeerID, Result, -}; - -use lookup::LookupService; -use refresh::RefreshService; - -pub type ArcDiscovery = Arc; - -pub struct Discovery { - /// Routing table - table: Arc>, - - /// Lookup Service - lookup_service: Arc, - - /// Refresh Service - refresh_service: Arc, - - /// Connector - connector: Arc, - /// Listener - listener: Arc, - - /// Connection queue - conn_queue: Arc, - - /// Inbound slots. - pub(crate) inbound_slots: Arc, - /// Outbound slots. - pub(crate) outbound_slots: Arc, - - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Holds the configuration for the P2P network. - config: Arc, -} - -impl Discovery { - /// Creates a new Discovery - pub fn new( - peer_id: &PeerID, - conn_queue: Arc, - config: Arc, - monitor: Arc, - ) -> ArcDiscovery { - let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); - let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); - - let table_key = peer_id.0; - let table = Arc::new(Mutex::new(RoutingTable::new(table_key))); - - let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone()); - let lookup_service = - LookupService::new(peer_id, table.clone(), config.clone(), monitor.clone()); - - let connector = Connector::new( - config.max_connect_retries, - outbound_slots.clone(), - monitor.clone(), - ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone()); - - Arc::new(Self { - refresh_service: Arc::new(refresh_service), - lookup_service: Arc::new(lookup_service), - conn_queue, - table, - inbound_slots, - outbound_slots, - connector, - listener, - task_group: TaskGroup::new(), - config, - }) - } - - /// Start the Discovery - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { - // Check if the listen_endpoint is provided, and if so, start a listener. - if let Some(endpoint) = &self.config.listen_endpoint { - // Return an error if the discovery port is set to 0. - if self.config.discovery_port == 0 { - return Err(Error::Config( - "Please add a valid discovery port".to_string(), - )); - } - - let resolved_endpoint = self.start_listener(endpoint, ex.clone()).await?; - - if endpoint.addr()? != resolved_endpoint.addr()? { - info!("Resolved listen endpoint: {resolved_endpoint}"); - self.lookup_service - .set_listen_endpoint(&resolved_endpoint) - .await; - self.refresh_service - .set_listen_endpoint(&resolved_endpoint) - .await; - } - } - - // Start the lookup service - self.lookup_service.start(ex.clone()).await?; - // Start the refresh service - self.refresh_service.start(ex.clone()).await?; - - // Attempt to manually connect to peer endpoints provided in the Config. - for endpoint in self.config.peer_endpoints.iter() { - let _ = self.connect(endpoint, None, ex.clone()).await; - } - - // Start connect loop - let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.connect_loop(ex), |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Connect loop stopped: {err}"); - } - }); - - Ok(()) - } - - /// Shuts down the discovery - pub async fn shutdown(&self) { - self.task_group.cancel().await; - self.connector.shutdown().await; - self.listener.shutdown().await; - - self.refresh_service.shutdown().await; - self.lookup_service.shutdown().await; - } - - /// Start a listener and on success, return the resolved endpoint. - async fn start_listener( - self: &Arc, - endpoint: &Endpoint, - ex: Executor<'_>, - ) -> Result { - let selfc = self.clone(); - let callback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Inbound).await; - Ok(()) - }; - - let resolved_endpoint = self.listener.start(ex, endpoint.clone(), callback).await?; - Ok(resolved_endpoint) - } - - /// This method will attempt to connect to a peer in the routing table. - /// If the routing table is empty, it will start the seeding process for - /// finding new peers. - /// - /// This will perform a backoff to prevent getting stuck in the loop - /// if the seeding process couldn't find any peers. - async fn connect_loop(self: Arc, ex: Executor<'_>) -> Result<()> { - let backoff = Backoff::new(500, self.config.seeding_interval * 1000); - loop { - let random_entry = self.random_entry(PENDING_ENTRY).await; - match random_entry { - Some(entry) => { - backoff.reset(); - let endpoint = Endpoint::Tcp(entry.addr, entry.port); - self.connect(&endpoint, Some(entry.key.into()), ex.clone()) - .await; - } - None => { - backoff.sleep().await; - self.start_seeding().await; - } - } - } - } - - /// Connect to the given endpoint using the connector - async fn connect(self: &Arc, endpoint: &Endpoint, pid: Option, ex: Executor<'_>) { - let selfc = self.clone(); - let pid_cloned = pid.clone(); - let cback = |conn: Conn| async move { - selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; - if let Some(pid) = pid_cloned { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; - } - Ok(()) - }; - - let res = self.connector.connect_with_cback(ex, endpoint, cback).await; - - if let Some(pid) = &pid { - match res { - Ok(_) => { - self.update_entry(pid, CONNECTED_ENTRY).await; - } - Err(_) => { - self.update_entry(pid, UNREACHABLE_ENTRY).await; - } - } - } - } - - /// Starts seeding process. - /// - /// This method randomly selects a peer from the routing table and - /// attempts to connect to that peer for the initial lookup. If the routing - /// table doesn't have an available entry, it will connect to one of the - /// provided bootstrap endpoints in the `Config` and initiate the lookup. - async fn start_seeding(&self) { - match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { - Some(entry) => { - let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); - if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { - self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; - error!("Failed to do lookup: {endpoint}: {err}"); - } - } - None => { - let peers = &self.config.bootstrap_peers; - for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { - if let Err(err) = self.lookup_service.start_lookup(endpoint).await { - error!("Failed to do lookup: {endpoint}: {err}"); - } - } - } - } - } - - /// Returns a random entry from routing table. - async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option { - self.table.lock().await.random_entry(entry_flag).cloned() - } - - /// Update the entry status - async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { - let table = &mut self.table.lock().await; - table.update_entry(&pid.0, entry_flag); - } -} diff --git a/karyons_p2p/src/discovery/refresh.rs b/karyons_p2p/src/discovery/refresh.rs deleted file mode 100644 index 7582c84..0000000 --- a/karyons_p2p/src/discovery/refresh.rs +++ /dev/null @@ -1,289 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use bincode::{Decode, Encode}; -use log::{error, info, trace}; -use rand::{rngs::OsRng, RngCore}; -use smol::{ - lock::{Mutex, RwLock}, - stream::StreamExt, - Timer, -}; - -use karyons_core::{ - async_utils::{timeout, Backoff, TaskGroup, TaskResult}, - utils::{decode, encode}, - Executor, -}; - -use karyons_net::{dial_udp, listen_udp, Addr, Connection, Endpoint, NetError, Port, UdpConn}; - -/// Maximum failures for an entry before removing it from the routing table. -pub const MAX_FAILURES: u32 = 3; - -/// Ping message size -const PINGMSG_SIZE: usize = 32; - -use crate::{ - monitor::{ConnEvent, DiscoveryEvent, Monitor}, - routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, - Config, Error, Result, -}; - -#[derive(Decode, Encode, Debug, Clone)] -pub struct PingMsg(pub [u8; 32]); - -#[derive(Decode, Encode, Debug)] -pub struct PongMsg(pub [u8; 32]); - -pub struct RefreshService { - /// Routing table - table: Arc>, - - /// Resolved listen endpoint - listen_endpoint: Option>, - - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Holds the configuration for the P2P network. - config: Arc, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl RefreshService { - /// Creates a new refresh service - pub fn new( - config: Arc, - table: Arc>, - monitor: Arc, - ) -> Self { - let listen_endpoint = config - .listen_endpoint - .as_ref() - .map(|endpoint| RwLock::new(endpoint.clone())); - - Self { - table, - listen_endpoint, - task_group: TaskGroup::new(), - config, - monitor, - } - } - - /// Start the refresh service - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { - if let Some(endpoint) = &self.listen_endpoint { - let endpoint = endpoint.read().await; - let addr = endpoint.addr()?; - let port = self.config.discovery_port; - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(addr.clone(), port), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {err}"); - } - }, - ); - } - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.refresh_loop(ex.clone()), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Refresh loop stopped: {err}"); - } - }, - ); - - Ok(()) - } - - /// Set the resolved listen endpoint. - pub async fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) { - if let Some(endpoint) = &self.listen_endpoint { - *endpoint.write().await = resolved_endpoint.clone(); - } - } - - /// Shuts down the refresh service - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - /// Initiates periodic refreshing of the routing table. This function will - /// select 8 random entries from each bucket in the routing table and start - /// sending Ping messages to the entries. - async fn refresh_loop(self: Arc, ex: Executor<'_>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); - loop { - timer.next().await; - trace!("Start refreshing the routing table..."); - - self.monitor - .notify(&DiscoveryEvent::RefreshStarted.into()) - .await; - - let table = self.table.lock().await; - let mut entries: Vec = vec![]; - for bucket in table.iter() { - for entry in bucket.random_iter(8) { - entries.push(entry.clone()) - } - } - drop(table); - - self.clone().do_refresh(&entries, ex.clone()).await; - } - } - - /// Iterates over the entries and spawns a new task for each entry to - /// initiate a connection attempt to that entry. - async fn do_refresh(self: Arc, entries: &[BucketEntry], ex: Executor<'_>) { - for chunk in entries.chunks(16) { - let mut tasks = Vec::new(); - for bucket_entry in chunk { - if bucket_entry.is_connected() { - continue; - } - - if bucket_entry.failures >= MAX_FAILURES { - self.table - .lock() - .await - .remove_entry(&bucket_entry.entry.key); - return; - } - - tasks.push(ex.spawn(self.clone().refresh_entry(bucket_entry.clone()))) - } - - for task in tasks { - task.await; - } - } - } - - /// Initiates refresh for a specific entry within the routing table. It - /// updates the routing table according to the result. - async fn refresh_entry(self: Arc, bucket_entry: BucketEntry) { - let key = &bucket_entry.entry.key; - match self.connect(&bucket_entry.entry).await { - Ok(_) => { - self.table.lock().await.update_entry(key, PENDING_ENTRY); - } - Err(err) => { - trace!("Failed to refresh entry {:?}: {err}", key); - let table = &mut self.table.lock().await; - if bucket_entry.failures >= MAX_FAILURES { - table.remove_entry(key); - return; - } - table.update_entry(key, UNREACHABLE_ENTRY); - } - } - } - - /// Initiates a UDP connection with the entry and attempts to send a Ping - /// message. If it fails, it retries according to the allowed retries - /// specified in the Config, with backoff between each retry. - async fn connect(&self, entry: &Entry) -> Result<()> { - let mut retry = 0; - let conn = dial_udp(&entry.addr, &entry.discovery_port).await?; - let backoff = Backoff::new(100, 5000); - while retry < self.config.refresh_connect_retries { - match self.send_ping_msg(&conn).await { - Ok(()) => return Ok(()), - Err(Error::KaryonsNet(NetError::Timeout)) => { - retry += 1; - backoff.sleep().await; - } - Err(err) => { - return Err(err); - } - } - } - - Err(NetError::Timeout.into()) - } - - /// Set up a UDP listener and start listening for Ping messages from other - /// peers. - async fn listen_loop(self: Arc, addr: Addr, port: Port) -> Result<()> { - let endpoint = Endpoint::Udp(addr.clone(), port); - let conn = match listen_udp(&addr, &port).await { - Ok(c) => { - self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) - .await; - c - } - Err(err) => { - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint.clone()).into()) - .await; - return Err(err.into()); - } - }; - info!("Start listening on {endpoint}"); - - loop { - let res = self.listen_to_ping_msg(&conn).await; - if let Err(err) = res { - trace!("Failed to handle ping msg {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - } - } - } - - /// Listen to receive a Ping message and respond with a Pong message. - async fn listen_to_ping_msg(&self, conn: &UdpConn) -> Result<()> { - let mut buf = [0; PINGMSG_SIZE]; - let (_, endpoint) = conn.recv_from(&mut buf).await?; - - self.monitor - .notify(&ConnEvent::Accepted(endpoint.clone()).into()) - .await; - - let (ping_msg, _) = decode::(&buf)?; - - let pong_msg = PongMsg(ping_msg.0); - let buffer = encode(&pong_msg)?; - - conn.send_to(&buffer, &endpoint).await?; - - self.monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) - .await; - Ok(()) - } - - /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, conn: &UdpConn) -> Result<()> { - let mut nonce: [u8; 32] = [0; 32]; - RngCore::fill_bytes(&mut OsRng, &mut nonce); - - let ping_msg = PingMsg(nonce); - let buffer = encode(&ping_msg)?; - conn.send(&buffer).await?; - - let buf = &mut [0; PINGMSG_SIZE]; - let t = Duration::from_secs(self.config.refresh_response_timeout); - timeout(t, conn.recv(buf)).await??; - - let (pong_msg, _) = decode::(buf)?; - - if ping_msg.0 != pong_msg.0 { - return Err(Error::InvalidPongMsg); - } - - Ok(()) - } -} diff --git a/karyons_p2p/src/error.rs b/karyons_p2p/src/error.rs deleted file mode 100644 index 945e90a..0000000 --- a/karyons_p2p/src/error.rs +++ /dev/null @@ -1,82 +0,0 @@ -use thiserror::Error as ThisError; - -pub type Result = std::result::Result; - -/// Represents Karyons's p2p Error. -#[derive(ThisError, Debug)] -pub enum Error { - #[error("IO Error: {0}")] - IO(#[from] std::io::Error), - - #[error("Unsupported protocol error: {0}")] - UnsupportedProtocol(String), - - #[error("Invalid message error: {0}")] - InvalidMsg(String), - - #[error("Parse error: {0}")] - ParseError(String), - - #[error("Incompatible version error: {0}")] - IncompatibleVersion(String), - - #[error("Config error: {0}")] - Config(String), - - #[error("Peer shutdown")] - PeerShutdown, - - #[error("Invalid Pong Msg")] - InvalidPongMsg, - - #[error("Discovery error: {0}")] - Discovery(&'static str), - - #[error("Lookup error: {0}")] - Lookup(&'static str), - - #[error("Peer already connected")] - PeerAlreadyConnected, - - #[error("Channel Send Error: {0}")] - ChannelSend(String), - - #[error("Channel Receive Error: {0}")] - ChannelRecv(String), - - #[error("CORE::ERROR : {0}")] - KaryonsCore(#[from] karyons_core::error::Error), - - #[error("NET::ERROR : {0}")] - KaryonsNet(#[from] karyons_net::NetError), -} - -impl From> for Error { - fn from(error: smol::channel::SendError) -> Self { - Error::ChannelSend(error.to_string()) - } -} - -impl From for Error { - fn from(error: smol::channel::RecvError) -> Self { - Error::ChannelRecv(error.to_string()) - } -} - -impl From for Error { - fn from(error: std::num::ParseIntError) -> Self { - Error::ParseError(error.to_string()) - } -} - -impl From for Error { - fn from(error: std::num::ParseFloatError) -> Self { - Error::ParseError(error.to_string()) - } -} - -impl From for Error { - fn from(error: semver::Error) -> Self { - Error::ParseError(error.to_string()) - } -} diff --git a/karyons_p2p/src/io_codec.rs b/karyons_p2p/src/io_codec.rs deleted file mode 100644 index 4515832..0000000 --- a/karyons_p2p/src/io_codec.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::time::Duration; - -use bincode::{Decode, Encode}; - -use karyons_core::{ - async_utils::timeout, - utils::{decode, encode, encode_into_slice}, -}; - -use karyons_net::{Connection, NetError}; - -use crate::{ - message::{NetMsg, NetMsgCmd, NetMsgHeader, MAX_ALLOWED_MSG_SIZE, MSG_HEADER_SIZE}, - Error, Result, -}; - -pub trait CodecMsg: Decode + Encode + std::fmt::Debug {} -impl CodecMsg for T {} - -/// I/O codec working with generic network connections. -/// -/// It is responsible for both decoding data received from the network and -/// encoding data before sending it. -pub struct IOCodec { - conn: Box, -} - -impl IOCodec { - /// Creates a new IOCodec. - pub fn new(conn: Box) -> Self { - Self { conn } - } - - /// Reads a message of type `NetMsg` from the connection. - /// - /// It reads the first 6 bytes as the header of the message, then reads - /// and decodes the remaining message data based on the determined header. - pub async fn read(&self) -> Result { - // Read 6 bytes to get the header of the incoming message - let mut buf = [0; MSG_HEADER_SIZE]; - self.conn.recv(&mut buf).await?; - - // Decode the header from bytes to NetMsgHeader - let (header, _) = decode::(&buf)?; - - if header.payload_size > MAX_ALLOWED_MSG_SIZE { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size".to_string(), - )); - } - - // Create a buffer to hold the message based on its length - let mut payload = vec![0; header.payload_size as usize]; - self.conn.recv(&mut payload).await?; - - Ok(NetMsg { header, payload }) - } - - /// Writes a message of type `T` to the connection. - /// - /// Before appending the actual message payload, it calculates the length of - /// the encoded message in bytes and appends this length to the message header. - pub async fn write(&self, command: NetMsgCmd, msg: &T) -> Result<()> { - let payload = encode(msg)?; - - // Create a buffer to hold the message header (6 bytes) - let header_buf = &mut [0; MSG_HEADER_SIZE]; - let header = NetMsgHeader { - command, - payload_size: payload.len() as u32, - }; - encode_into_slice(&header, header_buf)?; - - let mut buffer = vec![]; - // Append the header bytes to the buffer - buffer.extend_from_slice(header_buf); - // Append the message payload to the buffer - buffer.extend_from_slice(&payload); - - self.conn.send(&buffer).await?; - Ok(()) - } - - /// Reads a message of type `NetMsg` with the given timeout. - pub async fn read_timeout(&self, duration: Duration) -> Result { - timeout(duration, self.read()) - .await - .map_err(|_| NetError::Timeout)? - } - - /// Writes a message of type `T` with the given timeout. - pub async fn write_timeout( - &self, - command: NetMsgCmd, - msg: &T, - duration: Duration, - ) -> Result<()> { - timeout(duration, self.write(command, msg)) - .await - .map_err(|_| NetError::Timeout)? - } -} diff --git a/karyons_p2p/src/lib.rs b/karyons_p2p/src/lib.rs deleted file mode 100644 index 08ba059..0000000 --- a/karyons_p2p/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod backend; -mod config; -mod discovery; -mod error; -mod io_codec; -mod message; -mod net; -mod peer; -mod peer_pool; -mod protocols; -mod routing_table; -mod utils; - -/// Responsible for network and system monitoring. -/// [`Read More`](./monitor/struct.Monitor.html) -pub mod monitor; -/// Defines the protocol trait. -/// [`Read More`](./protocol/trait.Protocol.html) -pub mod protocol; - -pub use backend::{ArcBackend, Backend}; -pub use config::Config; -pub use error::Error as P2pError; -pub use peer::{ArcPeer, PeerID}; -pub use utils::Version; - -use error::{Error, Result}; diff --git a/karyons_p2p/src/message.rs b/karyons_p2p/src/message.rs deleted file mode 100644 index 833f6f4..0000000 --- a/karyons_p2p/src/message.rs +++ /dev/null @@ -1,133 +0,0 @@ -use std::collections::HashMap; - -use bincode::{Decode, Encode}; - -use karyons_net::{Addr, Port}; - -use crate::{protocol::ProtocolID, routing_table::Entry, utils::VersionInt, PeerID}; - -/// The size of the message header, in bytes. -pub const MSG_HEADER_SIZE: usize = 6; - -/// The maximum allowed size for a message in bytes. -pub const MAX_ALLOWED_MSG_SIZE: u32 = 1000000; - -/// Defines the main message in the Karyon P2P network. -/// -/// This message structure consists of a header and payload, where the header -/// typically contains essential information about the message, and the payload -/// contains the actual data being transmitted. -#[derive(Decode, Encode, Debug, Clone)] -pub struct NetMsg { - pub header: NetMsgHeader, - pub payload: Vec, -} - -/// Represents the header of a message. -#[derive(Decode, Encode, Debug, Clone)] -pub struct NetMsgHeader { - pub command: NetMsgCmd, - pub payload_size: u32, -} - -/// Defines message commands. -#[derive(Decode, Encode, Debug, Clone)] -#[repr(u8)] -pub enum NetMsgCmd { - Version, - Verack, - Protocol, - Shutdown, - - // NOTE: The following commands are used during the lookup process. - Ping, - Pong, - FindPeer, - Peer, - Peers, -} - -/// Defines a message related to a specific protocol. -#[derive(Decode, Encode, Debug, Clone)] -pub struct ProtocolMsg { - pub protocol_id: ProtocolID, - pub payload: Vec, -} - -/// Version message, providing information about a peer's capabilities. -#[derive(Decode, Encode, Debug, Clone)] -pub struct VerMsg { - pub peer_id: PeerID, - pub version: VersionInt, - pub protocols: HashMap, -} - -/// VerAck message acknowledging the receipt of a Version message. -#[derive(Decode, Encode, Debug, Clone)] -pub struct VerAckMsg(pub PeerID); - -/// Shutdown message. -#[derive(Decode, Encode, Debug, Clone)] -pub struct ShutdownMsg(pub u8); - -/// Ping message with a nonce and version information. -#[derive(Decode, Encode, Debug, Clone)] -pub struct PingMsg { - pub nonce: [u8; 32], - pub version: VersionInt, -} - -/// Ping message with a nonce. -#[derive(Decode, Encode, Debug)] -pub struct PongMsg(pub [u8; 32]); - -/// FindPeer message used to find a specific peer. -#[derive(Decode, Encode, Debug)] -pub struct FindPeerMsg(pub PeerID); - -/// PeerMsg containing information about a peer. -#[derive(Decode, Encode, Debug, Clone, PartialEq, Eq)] -pub struct PeerMsg { - pub peer_id: PeerID, - pub addr: Addr, - pub port: Port, - pub discovery_port: Port, -} - -/// PeersMsg a list of `PeerMsg`. -#[derive(Decode, Encode, Debug)] -pub struct PeersMsg(pub Vec); - -macro_rules! get_msg_payload { - ($a:ident, $b:expr) => { - if let NetMsgCmd::$a = $b.header.command { - $b.payload - } else { - return Err(Error::InvalidMsg(format!("Unexpected msg{:?}", $b))); - } - }; -} - -pub(super) use get_msg_payload; - -impl From for PeerMsg { - fn from(entry: Entry) -> PeerMsg { - PeerMsg { - peer_id: PeerID(entry.key), - addr: entry.addr, - port: entry.port, - discovery_port: entry.discovery_port, - } - } -} - -impl From for Entry { - fn from(peer: PeerMsg) -> Entry { - Entry { - key: peer.peer_id.0, - addr: peer.addr, - port: peer.port, - discovery_port: peer.discovery_port, - } - } -} diff --git a/karyons_p2p/src/monitor.rs b/karyons_p2p/src/monitor.rs deleted file mode 100644 index ee0bf44..0000000 --- a/karyons_p2p/src/monitor.rs +++ /dev/null @@ -1,154 +0,0 @@ -use std::fmt; - -use crate::PeerID; - -use karyons_core::pubsub::{ArcPublisher, Publisher, Subscription}; - -use karyons_net::Endpoint; - -/// Responsible for network and system monitoring. -/// -/// It use pub-sub pattern to notify the subscribers with new events. -/// -/// # Example -/// -/// ``` -/// use karyons_p2p::{Config, Backend, PeerID}; -/// async { -/// -/// let backend = Backend::new(PeerID::random(), Config::default()); -/// -/// // Create a new Subscription -/// let sub = backend.monitor().await; -/// -/// let event = sub.recv().await; -/// }; -/// ``` -pub struct Monitor { - inner: ArcPublisher, -} - -impl Monitor { - /// Creates a new Monitor - pub(crate) fn new() -> Monitor { - Self { - inner: Publisher::new(), - } - } - - /// Sends a new monitor event to all subscribers. - pub async fn notify(&self, event: &MonitorEvent) { - self.inner.notify(event).await; - } - - /// Subscribes to listen to new events. - pub async fn subscribe(&self) -> Subscription { - self.inner.subscribe().await - } -} - -/// Defines various type of event that can be monitored. -#[derive(Clone, Debug)] -pub enum MonitorEvent { - Conn(ConnEvent), - PeerPool(PeerPoolEvent), - Discovery(DiscoveryEvent), -} - -/// Defines connection-related events. -#[derive(Clone, Debug)] -pub enum ConnEvent { - Connected(Endpoint), - ConnectRetried(Endpoint), - ConnectFailed(Endpoint), - Accepted(Endpoint), - AcceptFailed, - Disconnected(Endpoint), - Listening(Endpoint), - ListenFailed(Endpoint), -} - -/// Defines `PeerPool` events. -#[derive(Clone, Debug)] -pub enum PeerPoolEvent { - NewPeer(PeerID), - RemovePeer(PeerID), -} - -/// Defines `Discovery` events. -#[derive(Clone, Debug)] -pub enum DiscoveryEvent { - LookupStarted(Endpoint), - LookupFailed(Endpoint), - LookupSucceeded(Endpoint, usize), - RefreshStarted, -} - -impl fmt::Display for MonitorEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - MonitorEvent::Conn(e) => format!("Connection Event: {e}"), - MonitorEvent::PeerPool(e) => format!("PeerPool Event: {e}"), - MonitorEvent::Discovery(e) => format!("Discovery Event: {e}"), - }; - write!(f, "{}", val) - } -} - -impl fmt::Display for ConnEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - ConnEvent::Connected(endpoint) => format!("Connected: {endpoint}"), - ConnEvent::ConnectFailed(endpoint) => format!("ConnectFailed: {endpoint}"), - ConnEvent::ConnectRetried(endpoint) => format!("ConnectRetried: {endpoint}"), - ConnEvent::AcceptFailed => "AcceptFailed".to_string(), - ConnEvent::Accepted(endpoint) => format!("Accepted: {endpoint}"), - ConnEvent::Disconnected(endpoint) => format!("Disconnected: {endpoint}"), - ConnEvent::Listening(endpoint) => format!("Listening: {endpoint}"), - ConnEvent::ListenFailed(endpoint) => format!("ListenFailed: {endpoint}"), - }; - write!(f, "{}", val) - } -} - -impl fmt::Display for PeerPoolEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - PeerPoolEvent::NewPeer(pid) => format!("NewPeer: {pid}"), - PeerPoolEvent::RemovePeer(pid) => format!("RemovePeer: {pid}"), - }; - write!(f, "{}", val) - } -} - -impl fmt::Display for DiscoveryEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let val = match self { - DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"), - DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"), - DiscoveryEvent::LookupSucceeded(endpoint, len) => { - format!("LookupSucceeded: {endpoint} {len}") - } - DiscoveryEvent::RefreshStarted => "RefreshStarted".to_string(), - }; - write!(f, "{}", val) - } -} - -impl From for MonitorEvent { - fn from(val: ConnEvent) -> Self { - MonitorEvent::Conn(val) - } -} - -impl From for MonitorEvent { - fn from(val: PeerPoolEvent) -> Self { - MonitorEvent::PeerPool(val) - } -} - -impl From for MonitorEvent { - fn from(val: DiscoveryEvent) -> Self { - MonitorEvent::Discovery(val) - } -} diff --git a/karyons_p2p/src/net/connection_queue.rs b/karyons_p2p/src/net/connection_queue.rs deleted file mode 100644 index fbc4bfc..0000000 --- a/karyons_p2p/src/net/connection_queue.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::sync::Arc; - -use smol::{channel::Sender, lock::Mutex}; - -use karyons_core::async_utils::CondVar; - -use karyons_net::Conn; - -use crate::net::ConnDirection; - -pub struct NewConn { - pub direction: ConnDirection, - pub conn: Conn, - pub disconnect_signal: Sender<()>, -} - -/// Connection queue -pub struct ConnQueue { - queue: Mutex>, - conn_available: CondVar, -} - -impl ConnQueue { - pub fn new() -> Arc { - Arc::new(Self { - queue: Mutex::new(Vec::new()), - conn_available: CondVar::new(), - }) - } - - /// Push a connection into the queue and wait for the disconnect signal - pub async fn handle(&self, conn: Conn, direction: ConnDirection) { - let (disconnect_signal, chan) = smol::channel::bounded(1); - let new_conn = NewConn { - direction, - conn, - disconnect_signal, - }; - self.queue.lock().await.push(new_conn); - self.conn_available.signal(); - let _ = chan.recv().await; - } - - /// Receive the next connection in the queue - pub async fn next(&self) -> NewConn { - let mut queue = self.queue.lock().await; - while queue.is_empty() { - queue = self.conn_available.wait(queue).await; - } - queue.pop().unwrap() - } -} diff --git a/karyons_p2p/src/net/connector.rs b/karyons_p2p/src/net/connector.rs deleted file mode 100644 index 72dc0d8..0000000 --- a/karyons_p2p/src/net/connector.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{trace, warn}; - -use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, - Executor, -}; -use karyons_net::{dial, Conn, Endpoint, NetError}; - -use crate::{ - monitor::{ConnEvent, Monitor}, - Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating outbound connections with other peers. -pub struct Connector { - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Manages available outbound slots. - connection_slots: Arc, - - /// The maximum number of retries allowed before successfully - /// establishing a connection. - max_retries: usize, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl Connector { - /// Creates a new Connector - pub fn new( - max_retries: usize, - connection_slots: Arc, - monitor: Arc, - ) -> Arc { - Arc::new(Self { - task_group: TaskGroup::new(), - monitor, - connection_slots, - max_retries, - }) - } - - /// Shuts down the connector - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - /// Establish a connection to the specified `endpoint`. If the connection - /// attempt fails, it performs a backoff and retries until the maximum allowed - /// number of retries is exceeded. On a successful connection, it returns a - /// `Conn` instance. - /// - /// This method will block until it finds an available slot. - pub async fn connect(&self, endpoint: &Endpoint) -> Result { - self.connection_slots.wait_for_slot().await; - self.connection_slots.add(); - - let mut retry = 0; - let backoff = Backoff::new(500, 2000); - while retry < self.max_retries { - let conn_result = dial(endpoint).await; - - if let Ok(conn) = conn_result { - self.monitor - .notify(&ConnEvent::Connected(endpoint.clone()).into()) - .await; - return Ok(conn); - } - - self.monitor - .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into()) - .await; - - backoff.sleep().await; - - warn!("try to reconnect {endpoint}"); - retry += 1; - } - - self.monitor - .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into()) - .await; - - self.connection_slots.remove().await; - Err(NetError::Timeout.into()) - } - - /// Establish a connection to the given `endpoint`. For each new connection, - /// it invokes the provided `callback`, and pass the connection to the callback. - pub async fn connect_with_cback<'a, Fut>( - self: &Arc, - ex: Executor<'a>, - endpoint: &Endpoint, - callback: impl FnOnce(Conn) -> Fut + Send + 'a, - ) -> Result<()> - where - Fut: Future> + Send + 'a, - { - let conn = self.connect(endpoint).await?; - - let selfc = self.clone(); - let endpoint = endpoint.clone(); - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Outbound connection dropped: {err}"); - } - selfc - .monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) - .await; - selfc.connection_slots.remove().await; - }; - - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); - - Ok(()) - } -} diff --git a/karyons_p2p/src/net/listener.rs b/karyons_p2p/src/net/listener.rs deleted file mode 100644 index d1a7bfb..0000000 --- a/karyons_p2p/src/net/listener.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::{future::Future, sync::Arc}; - -use log::{error, info, trace}; - -use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - Executor, -}; - -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; - -use crate::{ - monitor::{ConnEvent, Monitor}, - Result, -}; - -use super::slots::ConnectionSlots; - -/// Responsible for creating inbound connections with other peers. -pub struct Listener { - /// Managing spawned tasks. - task_group: TaskGroup, - - /// Manages available inbound slots. - connection_slots: Arc, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl Listener { - /// Creates a new Listener - pub fn new(connection_slots: Arc, monitor: Arc) -> Arc { - Arc::new(Self { - connection_slots, - task_group: TaskGroup::new(), - monitor, - }) - } - - /// Starts a listener on the given `endpoint`. For each incoming connection - /// that is accepted, it invokes the provided `callback`, and pass the - /// connection to the callback. - /// - /// Returns the resloved listening endpoint. - pub async fn start<'a, Fut>( - self: &Arc, - ex: Executor<'a>, - endpoint: Endpoint, - // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result - where - Fut: Future> + Send + 'a, - { - let listener = match listen(&endpoint).await { - Ok(listener) => { - self.monitor - .notify(&ConnEvent::Listening(endpoint.clone()).into()) - .await; - listener - } - Err(err) => { - error!("Failed to listen on {endpoint}: {err}"); - self.monitor - .notify(&ConnEvent::ListenFailed(endpoint).into()) - .await; - return Err(err.into()); - } - }; - - let resolved_endpoint = listener.local_endpoint()?; - - info!("Start listening on {endpoint}"); - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.listen_loop(ex.clone(), listener, callback), - |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }, - ); - Ok(resolved_endpoint) - } - - /// Shuts down the listener - pub async fn shutdown(&self) { - self.task_group.cancel().await; - } - - async fn listen_loop<'a, Fut>( - self: Arc, - ex: Executor<'a>, - listener: Box, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'a, - ) -> Result<()> - where - Fut: Future> + Send + 'a, - { - loop { - // Wait for an available inbound slot. - self.connection_slots.wait_for_slot().await; - let result = listener.accept().await; - - let conn = match result { - Ok(c) => { - self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) - .await; - c - } - Err(err) => { - error!("Failed to accept a new connection: {err}"); - self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); - } - }; - - self.connection_slots.add(); - - let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; - let on_disconnect = |res| async move { - if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); - } - selfc - .monitor - .notify(&ConnEvent::Disconnected(endpoint).into()) - .await; - selfc.connection_slots.remove().await; - }; - - let callback = callback.clone(); - self.task_group - .spawn(ex.clone(), callback(conn), on_disconnect); - } - } -} diff --git a/karyons_p2p/src/net/mod.rs b/karyons_p2p/src/net/mod.rs deleted file mode 100644 index 9cdc748..0000000 --- a/karyons_p2p/src/net/mod.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod connection_queue; -mod connector; -mod listener; -mod slots; - -pub use connection_queue::ConnQueue; -pub use connector::Connector; -pub use listener::Listener; -pub use slots::ConnectionSlots; - -use std::fmt; - -/// Defines the direction of a network connection. -#[derive(Clone, Debug)] -pub enum ConnDirection { - Inbound, - Outbound, -} - -impl fmt::Display for ConnDirection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ConnDirection::Inbound => write!(f, "Inbound"), - ConnDirection::Outbound => write!(f, "Outbound"), - } - } -} diff --git a/karyons_p2p/src/net/slots.rs b/karyons_p2p/src/net/slots.rs deleted file mode 100644 index 99f0a78..0000000 --- a/karyons_p2p/src/net/slots.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - -use karyons_core::async_utils::CondWait; - -/// Manages available inbound and outbound slots. -pub struct ConnectionSlots { - /// A condvar for notifying when a slot become available. - signal: CondWait, - /// The number of occupied slots - slots: AtomicUsize, - /// The maximum number of slots. - max_slots: usize, -} - -impl ConnectionSlots { - /// Creates a new ConnectionSlots - pub fn new(max_slots: usize) -> Self { - Self { - signal: CondWait::new(), - slots: AtomicUsize::new(0), - max_slots, - } - } - - /// Returns the number of occupied slots - pub fn load(&self) -> usize { - self.slots.load(Ordering::SeqCst) - } - - /// Increases the occupied slots by one. - pub fn add(&self) { - self.slots.fetch_add(1, Ordering::SeqCst); - } - - /// Decreases the occupied slots by one and notifies the waiting signal - /// to start accepting/connecting new connections. - pub async fn remove(&self) { - self.slots.fetch_sub(1, Ordering::SeqCst); - if self.slots.load(Ordering::SeqCst) < self.max_slots { - self.signal.signal().await; - } - } - - /// Waits for a slot to become available. - pub async fn wait_for_slot(&self) { - if self.slots.load(Ordering::SeqCst) < self.max_slots { - return; - } - - // Wait for a signal - self.signal.wait().await; - self.signal.reset().await; - } -} diff --git a/karyons_p2p/src/peer/mod.rs b/karyons_p2p/src/peer/mod.rs deleted file mode 100644 index ee0fdc4..0000000 --- a/karyons_p2p/src/peer/mod.rs +++ /dev/null @@ -1,237 +0,0 @@ -mod peer_id; - -pub use peer_id::PeerID; - -use std::sync::Arc; - -use log::{error, trace}; -use smol::{ - channel::{self, Receiver, Sender}, - lock::RwLock, -}; - -use karyons_core::{ - async_utils::{select, Either, TaskGroup, TaskResult}, - event::{ArcEventSys, EventListener, EventSys}, - utils::{decode, encode}, - Executor, -}; - -use karyons_net::Endpoint; - -use crate::{ - io_codec::{CodecMsg, IOCodec}, - message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, - net::ConnDirection, - peer_pool::{ArcPeerPool, WeakPeerPool}, - protocol::{Protocol, ProtocolEvent, ProtocolID}, - Config, Error, Result, -}; - -pub type ArcPeer = Arc; - -pub struct Peer { - /// Peer's ID - id: PeerID, - - /// A weak pointer to `PeerPool` - peer_pool: WeakPeerPool, - - /// Holds the IOCodec for the peer connection - io_codec: IOCodec, - - /// Remote endpoint for the peer - remote_endpoint: Endpoint, - - /// The direction of the connection, either `Inbound` or `Outbound` - conn_direction: ConnDirection, - - /// A list of protocol IDs - protocol_ids: RwLock>, - - /// `EventSys` responsible for sending events to the protocols. - protocol_events: ArcEventSys, - - /// This channel is used to send a stop signal to the read loop. - stop_chan: (Sender>, Receiver>), - - /// Managing spawned tasks. - task_group: TaskGroup, -} - -impl Peer { - /// Creates a new peer - pub fn new( - peer_pool: WeakPeerPool, - id: &PeerID, - io_codec: IOCodec, - remote_endpoint: Endpoint, - conn_direction: ConnDirection, - ) -> ArcPeer { - Arc::new(Peer { - id: id.clone(), - peer_pool, - io_codec, - protocol_ids: RwLock::new(Vec::new()), - remote_endpoint, - conn_direction, - protocol_events: EventSys::new(), - task_group: TaskGroup::new(), - stop_chan: channel::bounded(1), - }) - } - - /// Run the peer - pub async fn run(self: Arc, ex: Executor<'_>) -> Result<()> { - self.start_protocols(ex.clone()).await; - self.read_loop().await - } - - /// Send a message to the peer connection using the specified protocol. - pub async fn send(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { - let payload = encode(msg)?; - - let proto_msg = ProtocolMsg { - protocol_id: protocol_id.to_string(), - payload: payload.to_vec(), - }; - - self.io_codec.write(NetMsgCmd::Protocol, &proto_msg).await?; - Ok(()) - } - - /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast(&self, protocol_id: &ProtocolID, msg: &T) { - self.peer_pool().broadcast(protocol_id, msg).await; - } - - /// Shuts down the peer - pub async fn shutdown(&self) { - trace!("peer {} start shutting down", self.id); - - // Send shutdown event to all protocols - for protocol_id in self.protocol_ids.read().await.iter() { - self.protocol_events - .emit_by_topic(protocol_id, &ProtocolEvent::Shutdown) - .await; - } - - // Send a stop signal to the read loop - // - // No need to handle the error here; a dropped channel and - // sending a stop signal have the same effect. - let _ = self.stop_chan.0.try_send(Ok(())); - - // No need to handle the error here - let _ = self - .io_codec - .write(NetMsgCmd::Shutdown, &ShutdownMsg(0)) - .await; - - // Force shutting down - self.task_group.cancel().await; - } - - /// Check if the connection is Inbound - #[inline] - pub fn is_inbound(&self) -> bool { - match self.conn_direction { - ConnDirection::Inbound => true, - ConnDirection::Outbound => false, - } - } - - /// Returns the direction of the connection, which can be either `Inbound` - /// or `Outbound`. - #[inline] - pub fn direction(&self) -> &ConnDirection { - &self.conn_direction - } - - /// Returns the remote endpoint for the peer - #[inline] - pub fn remote_endpoint(&self) -> &Endpoint { - &self.remote_endpoint - } - - /// Return the peer's ID - #[inline] - pub fn id(&self) -> &PeerID { - &self.id - } - - /// Returns the `Config` instance. - pub fn config(&self) -> Arc { - self.peer_pool().config.clone() - } - - /// Registers a listener for the given Protocol `P`. - pub async fn register_listener(&self) -> EventListener { - self.protocol_events.register(&P::id()).await - } - - /// Start a read loop to handle incoming messages from the peer connection. - async fn read_loop(&self) -> Result<()> { - loop { - let fut = select(self.stop_chan.1.recv(), self.io_codec.read()).await; - let result = match fut { - Either::Left(stop_signal) => { - trace!("Peer {} received a stop signal", self.id); - return stop_signal?; - } - Either::Right(result) => result, - }; - - let msg = result?; - - match msg.header.command { - NetMsgCmd::Protocol => { - let msg: ProtocolMsg = decode(&msg.payload)?.0; - - if !self.protocol_ids.read().await.contains(&msg.protocol_id) { - return Err(Error::UnsupportedProtocol(msg.protocol_id)); - } - - let proto_id = &msg.protocol_id; - let msg = ProtocolEvent::Message(msg.payload); - self.protocol_events.emit_by_topic(proto_id, &msg).await; - } - NetMsgCmd::Shutdown => { - return Err(Error::PeerShutdown); - } - command => return Err(Error::InvalidMsg(format!("Unexpected msg {:?}", command))), - } - } - } - - /// Start running the protocols for this peer connection. - async fn start_protocols(self: &Arc, ex: Executor<'_>) { - for (protocol_id, constructor) in self.peer_pool().protocols.read().await.iter() { - trace!("peer {} start protocol {protocol_id}", self.id); - let protocol = constructor(self.clone()); - - self.protocol_ids.write().await.push(protocol_id.clone()); - - let selfc = self.clone(); - let exc = ex.clone(); - let proto_idc = protocol_id.clone(); - - let on_failure = |result: TaskResult>| async move { - if let TaskResult::Completed(res) = result { - if res.is_err() { - error!("protocol {} stopped", proto_idc); - } - // Send a stop signal to read loop - let _ = selfc.stop_chan.0.try_send(res); - } - }; - - self.task_group - .spawn(ex.clone(), protocol.start(exc), on_failure); - } - } - - fn peer_pool(&self) -> ArcPeerPool { - self.peer_pool.upgrade().unwrap() - } -} diff --git a/karyons_p2p/src/peer/peer_id.rs b/karyons_p2p/src/peer/peer_id.rs deleted file mode 100644 index c8aec7d..0000000 --- a/karyons_p2p/src/peer/peer_id.rs +++ /dev/null @@ -1,41 +0,0 @@ -use bincode::{Decode, Encode}; -use rand::{rngs::OsRng, RngCore}; -use sha2::{Digest, Sha256}; - -/// Represents a unique identifier for a peer. -#[derive(Clone, Debug, Eq, PartialEq, Hash, Decode, Encode)] -pub struct PeerID(pub [u8; 32]); - -impl std::fmt::Display for PeerID { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let id = self.0[0..8] - .iter() - .map(|b| format!("{:x}", b)) - .collect::>() - .join(""); - - write!(f, "{}", id) - } -} - -impl PeerID { - /// Creates a new PeerID. - pub fn new(src: &[u8]) -> Self { - let mut hasher = Sha256::new(); - hasher.update(src); - Self(hasher.finalize().into()) - } - - /// Generates a random PeerID. - pub fn random() -> Self { - let mut id: [u8; 32] = [0; 32]; - OsRng.fill_bytes(&mut id); - Self(id) - } -} - -impl From<[u8; 32]> for PeerID { - fn from(b: [u8; 32]) -> Self { - PeerID(b) - } -} diff --git a/karyons_p2p/src/peer_pool.rs b/karyons_p2p/src/peer_pool.rs deleted file mode 100644 index eac4d3d..0000000 --- a/karyons_p2p/src/peer_pool.rs +++ /dev/null @@ -1,337 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Weak}, - time::Duration, -}; - -use log::{error, info, trace, warn}; -use smol::{ - channel::Sender, - lock::{Mutex, RwLock}, -}; - -use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - utils::decode, - Executor, -}; - -use karyons_net::Conn; - -use crate::{ - config::Config, - io_codec::{CodecMsg, IOCodec}, - message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, - monitor::{Monitor, PeerPoolEvent}, - net::ConnDirection, - net::ConnQueue, - peer::{ArcPeer, Peer, PeerID}, - protocol::{Protocol, ProtocolConstructor, ProtocolID}, - protocols::PingProtocol, - utils::{version_match, Version, VersionInt}, - Error, Result, -}; - -pub type ArcPeerPool = Arc; -pub type WeakPeerPool = Weak; - -pub struct PeerPool { - /// Peer's ID - pub id: PeerID, - - /// Connection queue - conn_queue: Arc, - - /// Holds the running peers. - peers: Mutex>, - - /// Hashmap contains protocol constructors. - pub(crate) protocols: RwLock>>, - - /// Hashmap contains protocol IDs and their versions. - protocol_versions: Arc>>, - - /// Managing spawned tasks. - task_group: TaskGroup, - - /// The Configuration for the P2P network. - pub config: Arc, - - /// Responsible for network and system monitoring. - monitor: Arc, -} - -impl PeerPool { - /// Creates a new PeerPool - pub fn new( - id: &PeerID, - conn_queue: Arc, - config: Arc, - monitor: Arc, - ) -> Arc { - let protocols = RwLock::new(HashMap::new()); - let protocol_versions = Arc::new(RwLock::new(HashMap::new())); - - Arc::new(Self { - id: id.clone(), - conn_queue, - peers: Mutex::new(HashMap::new()), - protocols, - protocol_versions, - task_group: TaskGroup::new(), - monitor, - config, - }) - } - - /// Start - pub async fn start(self: &Arc, ex: Executor<'_>) -> Result<()> { - self.setup_protocols().await?; - let selfc = self.clone(); - self.task_group - .spawn(ex.clone(), selfc.listen_loop(ex.clone()), |_| async {}); - Ok(()) - } - - /// Listens to a new connection from the connection queue - pub async fn listen_loop(self: Arc, ex: Executor<'_>) { - loop { - let new_conn = self.conn_queue.next().await; - let disconnect_signal = new_conn.disconnect_signal; - - let result = self - .new_peer( - new_conn.conn, - &new_conn.direction, - disconnect_signal.clone(), - ex.clone(), - ) - .await; - - if result.is_err() { - let _ = disconnect_signal.send(()).await; - } - } - } - - /// Shuts down - pub async fn shutdown(&self) { - for (_, peer) in self.peers.lock().await.iter() { - peer.shutdown().await; - } - - self.task_group.cancel().await; - } - - /// Attach a custom protocol to the network - pub async fn attach_protocol(&self, c: Box) -> Result<()> { - let protocol_versions = &mut self.protocol_versions.write().await; - let protocols = &mut self.protocols.write().await; - - protocol_versions.insert(P::id(), P::version()?); - protocols.insert(P::id(), Box::new(c) as Box); - Ok(()) - } - - /// Returns the number of currently connected peers. - pub async fn peers_len(&self) -> usize { - self.peers.lock().await.len() - } - - /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast(&self, proto_id: &ProtocolID, msg: &T) { - for (pid, peer) in self.peers.lock().await.iter() { - if let Err(err) = peer.send(proto_id, msg).await { - error!("failed to send msg to {pid}: {err}"); - continue; - } - } - } - - /// Add a new peer to the peer list. - pub async fn new_peer( - self: &Arc, - conn: Conn, - conn_direction: &ConnDirection, - disconnect_signal: Sender<()>, - ex: Executor<'_>, - ) -> Result { - let endpoint = conn.peer_endpoint()?; - let io_codec = IOCodec::new(conn); - - // Do a handshake with a connection before creating a new peer. - let pid = self.do_handshake(&io_codec, conn_direction).await?; - - // TODO: Consider restricting the subnet for inbound connections - if self.contains_peer(&pid).await { - return Err(Error::PeerAlreadyConnected); - } - - // Create a new peer - let peer = Peer::new( - Arc::downgrade(self), - &pid, - io_codec, - endpoint.clone(), - conn_direction.clone(), - ); - - // Insert the new peer - self.peers.lock().await.insert(pid.clone(), peer.clone()); - - let selfc = self.clone(); - let pid_c = pid.clone(); - let on_disconnect = |result| async move { - if let TaskResult::Completed(_) = result { - if let Err(err) = selfc.remove_peer(&pid_c).await { - error!("Failed to remove peer {pid_c}: {err}"); - } - let _ = disconnect_signal.send(()).await; - } - }; - - self.task_group - .spawn(ex.clone(), peer.run(ex.clone()), on_disconnect); - - info!("Add new peer {pid}, direction: {conn_direction}, endpoint: {endpoint}"); - - self.monitor - .notify(&PeerPoolEvent::NewPeer(pid.clone()).into()) - .await; - Ok(pid) - } - - /// Checks if the peer list contains a peer with the given peer id - pub async fn contains_peer(&self, pid: &PeerID) -> bool { - self.peers.lock().await.contains_key(pid) - } - - /// Shuts down the peer and remove it from the peer list. - async fn remove_peer(&self, pid: &PeerID) -> Result<()> { - let mut peers = self.peers.lock().await; - let result = peers.remove(pid); - - drop(peers); - - let peer = match result { - Some(p) => p, - None => return Ok(()), - }; - - peer.shutdown().await; - - self.monitor - .notify(&PeerPoolEvent::RemovePeer(pid.clone()).into()) - .await; - - let endpoint = peer.remote_endpoint(); - let direction = peer.direction(); - - warn!("Peer {pid} removed, direction: {direction}, endpoint: {endpoint}",); - Ok(()) - } - - /// Attach the core protocols. - async fn setup_protocols(&self) -> Result<()> { - self.attach_protocol::(Box::new(PingProtocol::new)) - .await - } - - /// Initiate a handshake with a connection. - async fn do_handshake( - &self, - io_codec: &IOCodec, - conn_direction: &ConnDirection, - ) -> Result { - match conn_direction { - ConnDirection::Inbound => { - let pid = self.wait_vermsg(io_codec).await?; - self.send_verack(io_codec).await?; - Ok(pid) - } - ConnDirection::Outbound => { - self.send_vermsg(io_codec).await?; - self.wait_verack(io_codec).await - } - } - } - - /// Send a Version message - async fn send_vermsg(&self, io_codec: &IOCodec) -> Result<()> { - let pids = self.protocol_versions.read().await; - let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect(); - drop(pids); - - let vermsg = VerMsg { - peer_id: self.id.clone(), - protocols, - version: self.config.version.v.clone(), - }; - - trace!("Send VerMsg"); - io_codec.write(NetMsgCmd::Version, &vermsg).await?; - Ok(()) - } - - /// Wait for a Version message - /// - /// Returns the peer's ID upon successfully receiving the Version message. - async fn wait_vermsg(&self, io_codec: &IOCodec) -> Result { - let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; - - let payload = get_msg_payload!(Version, msg); - let (vermsg, _) = decode::(&payload)?; - - if !version_match(&self.config.version.req, &vermsg.version) { - return Err(Error::IncompatibleVersion("system: {}".into())); - } - - self.protocols_match(&vermsg.protocols).await?; - - trace!("Received VerMsg from: {}", vermsg.peer_id); - Ok(vermsg.peer_id) - } - - /// Send a Verack message - async fn send_verack(&self, io_codec: &IOCodec) -> Result<()> { - let verack = VerAckMsg(self.id.clone()); - - trace!("Send VerAckMsg"); - io_codec.write(NetMsgCmd::Verack, &verack).await?; - Ok(()) - } - - /// Wait for a Verack message - /// - /// Returns the peer's ID upon successfully receiving the Verack message. - async fn wait_verack(&self, io_codec: &IOCodec) -> Result { - let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; - - let payload = get_msg_payload!(Verack, msg); - let (verack, _) = decode::(&payload)?; - - trace!("Received VerAckMsg from: {}", verack.0); - Ok(verack.0) - } - - /// Check if the new connection has compatible protocols. - async fn protocols_match(&self, protocols: &HashMap) -> Result<()> { - for (n, pv) in protocols.iter() { - let pids = self.protocol_versions.read().await; - - match pids.get(n) { - Some(v) => { - if !version_match(&v.req, pv) { - return Err(Error::IncompatibleVersion(format!("{n} protocol: {pv}"))); - } - } - None => { - return Err(Error::UnsupportedProtocol(n.to_string())); - } - } - } - Ok(()) - } -} diff --git a/karyons_p2p/src/protocol.rs b/karyons_p2p/src/protocol.rs deleted file mode 100644 index 515efc6..0000000 --- a/karyons_p2p/src/protocol.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; - -use karyons_core::{event::EventValue, Executor}; - -use crate::{peer::ArcPeer, utils::Version, Result}; - -pub type ArcProtocol = Arc; - -pub type ProtocolConstructor = dyn Fn(ArcPeer) -> Arc + Send + Sync; - -pub type ProtocolID = String; - -/// Protocol event -#[derive(Debug, Clone)] -pub enum ProtocolEvent { - /// Message event, contains a vector of bytes. - Message(Vec), - /// Shutdown event signals the protocol to gracefully shut down. - Shutdown, -} - -impl EventValue for ProtocolEvent { - fn id() -> &'static str { - "ProtocolEvent" - } -} - -/// The Protocol trait defines the interface for core protocols -/// and custom protocols. -/// -/// # Example -/// ``` -/// use std::sync::Arc; -/// -/// use async_trait::async_trait; -/// use smol::Executor; -/// -/// use karyons_p2p::{ -/// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent}, -/// Backend, PeerID, Config, Version, P2pError, ArcPeer}; -/// -/// pub struct NewProtocol { -/// peer: ArcPeer, -/// } -/// -/// impl NewProtocol { -/// fn new(peer: ArcPeer) -> ArcProtocol { -/// Arc::new(Self { -/// peer, -/// }) -/// } -/// } -/// -/// #[async_trait] -/// impl Protocol for NewProtocol { -/// async fn start(self: Arc, ex: Arc>) -> Result<(), P2pError> { -/// let listener = self.peer.register_listener::().await; -/// loop { -/// let event = listener.recv().await.unwrap(); -/// -/// match event { -/// ProtocolEvent::Message(msg) => { -/// println!("{:?}", msg); -/// } -/// ProtocolEvent::Shutdown => { -/// break; -/// } -/// } -/// } -/// -/// listener.cancel().await; -/// Ok(()) -/// } -/// -/// fn version() -> Result { -/// "0.2.0, >0.1.0".parse() -/// } -/// -/// fn id() -> ProtocolID { -/// "NEWPROTOCOLID".into() -/// } -/// } -/// -/// async { -/// let peer_id = PeerID::random(); -/// let config = Config::default(); -/// -/// // Create a new Backend -/// let backend = Backend::new(peer_id, config); -/// -/// // Attach the NewProtocol -/// let c = move |peer| NewProtocol::new(peer); -/// backend.attach_protocol::(c).await.unwrap(); -/// }; -/// -/// ``` -#[async_trait] -pub trait Protocol: Send + Sync { - /// Start the protocol - async fn start(self: Arc, ex: Executor<'_>) -> Result<()>; - - /// Returns the version of the protocol. - fn version() -> Result - where - Self: Sized; - - /// Returns the unique ProtocolID associated with the protocol. - fn id() -> ProtocolID - where - Self: Sized; -} diff --git a/karyons_p2p/src/protocols/mod.rs b/karyons_p2p/src/protocols/mod.rs deleted file mode 100644 index 4a8f6b9..0000000 --- a/karyons_p2p/src/protocols/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod ping; - -pub use ping::PingProtocol; diff --git a/karyons_p2p/src/protocols/ping.rs b/karyons_p2p/src/protocols/ping.rs deleted file mode 100644 index b337494..0000000 --- a/karyons_p2p/src/protocols/ping.rs +++ /dev/null @@ -1,173 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use async_trait::async_trait; -use bincode::{Decode, Encode}; -use log::trace; -use rand::{rngs::OsRng, RngCore}; -use smol::{ - channel, - channel::{Receiver, Sender}, - stream::StreamExt, - Timer, -}; - -use karyons_core::{ - async_utils::{select, timeout, Either, TaskGroup, TaskResult}, - event::EventListener, - utils::decode, - Executor, -}; - -use karyons_net::NetError; - -use crate::{ - peer::ArcPeer, - protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID}, - utils::Version, - Result, -}; - -const MAX_FAILUERS: u32 = 3; - -#[derive(Clone, Debug, Encode, Decode)] -enum PingProtocolMsg { - Ping([u8; 32]), - Pong([u8; 32]), -} - -pub struct PingProtocol { - peer: ArcPeer, - ping_interval: u64, - ping_timeout: u64, - task_group: TaskGroup, -} - -impl PingProtocol { - #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer) -> ArcProtocol { - let ping_interval = peer.config().ping_interval; - let ping_timeout = peer.config().ping_timeout; - Arc::new(Self { - peer, - ping_interval, - ping_timeout, - task_group: TaskGroup::new(), - }) - } - - async fn recv_loop( - &self, - listener: &EventListener, - pong_chan: Sender<[u8; 32]>, - ) -> Result<()> { - loop { - let event = listener.recv().await?; - let msg_payload = match event.clone() { - ProtocolEvent::Message(m) => m, - ProtocolEvent::Shutdown => { - break; - } - }; - - let (msg, _) = decode::(&msg_payload)?; - - match msg { - PingProtocolMsg::Ping(nonce) => { - trace!("Received Ping message {:?}", nonce); - self.peer - .send(&Self::id(), &PingProtocolMsg::Pong(nonce)) - .await?; - trace!("Send back Pong message {:?}", nonce); - } - PingProtocolMsg::Pong(nonce) => { - pong_chan.send(nonce).await?; - } - } - } - Ok(()) - } - - async fn ping_loop(self: Arc, chan: Receiver<[u8; 32]>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.ping_interval)); - let rng = &mut OsRng; - let mut retry = 0; - - while retry < MAX_FAILUERS { - timer.next().await; - - let mut ping_nonce: [u8; 32] = [0; 32]; - rng.fill_bytes(&mut ping_nonce); - - trace!("Send Ping message {:?}", ping_nonce); - self.peer - .send(&Self::id(), &PingProtocolMsg::Ping(ping_nonce)) - .await?; - - let d = Duration::from_secs(self.ping_timeout); - - // Wait for Pong message - let pong_msg = match timeout(d, chan.recv()).await { - Ok(m) => m?, - Err(_) => { - retry += 1; - continue; - } - }; - - trace!("Received Pong message {:?}", pong_msg); - - if pong_msg != ping_nonce { - retry += 1; - continue; - } - } - - Err(NetError::Timeout.into()) - } -} - -#[async_trait] -impl Protocol for PingProtocol { - async fn start(self: Arc, ex: Executor<'_>) -> Result<()> { - trace!("Start Ping protocol"); - let (pong_chan, pong_chan_recv) = channel::bounded(1); - let (stop_signal_s, stop_signal) = channel::bounded::>(1); - - let selfc = self.clone(); - self.task_group.spawn( - ex.clone(), - selfc.clone().ping_loop(pong_chan_recv.clone()), - |res| async move { - if let TaskResult::Completed(result) = res { - let _ = stop_signal_s.send(result).await; - } - }, - ); - - let listener = self.peer.register_listener::().await; - - let result = select(self.recv_loop(&listener, pong_chan), stop_signal.recv()).await; - listener.cancel().await; - self.task_group.cancel().await; - - match result { - Either::Left(res) => { - trace!("Receive loop stopped {:?}", res); - res - } - Either::Right(res) => { - let res = res?; - trace!("Ping loop stopped {:?}", res); - res - } - } - } - - fn version() -> Result { - "0.1.0".parse() - } - - fn id() -> ProtocolID { - "PING".into() - } -} diff --git a/karyons_p2p/src/routing_table/bucket.rs b/karyons_p2p/src/routing_table/bucket.rs deleted file mode 100644 index 13edd24..0000000 --- a/karyons_p2p/src/routing_table/bucket.rs +++ /dev/null @@ -1,123 +0,0 @@ -use super::{Entry, Key}; - -use rand::{rngs::OsRng, seq::SliceRandom}; - -/// BITFLAGS represent the status of an Entry within a bucket. -pub type EntryStatusFlag = u16; - -/// The entry is connected. -pub const CONNECTED_ENTRY: EntryStatusFlag = 0b00001; - -/// The entry is disconnected. This will increase the failure counter. -pub const DISCONNECTED_ENTRY: EntryStatusFlag = 0b00010; - -/// The entry is ready to reconnect, meaning it has either been added and -/// has no connection attempts, or it has been refreshed. -pub const PENDING_ENTRY: EntryStatusFlag = 0b00100; - -/// The entry is unreachable. This will increase the failure counter. -pub const UNREACHABLE_ENTRY: EntryStatusFlag = 0b01000; - -/// The entry is unstable. This will increase the failure counter. -pub const UNSTABLE_ENTRY: EntryStatusFlag = 0b10000; - -#[allow(dead_code)] -pub const ALL_ENTRY: EntryStatusFlag = 0b11111; - -/// A BucketEntry represents a peer in the routing table. -#[derive(Clone, Debug)] -pub struct BucketEntry { - pub status: EntryStatusFlag, - pub entry: Entry, - pub failures: u32, - pub last_seen: i64, -} - -impl BucketEntry { - pub fn is_connected(&self) -> bool { - self.status ^ CONNECTED_ENTRY == 0 - } - - pub fn is_unreachable(&self) -> bool { - self.status ^ UNREACHABLE_ENTRY == 0 - } - - pub fn is_unstable(&self) -> bool { - self.status ^ UNSTABLE_ENTRY == 0 - } -} - -/// The number of entries that can be stored within a single bucket. -pub const BUCKET_SIZE: usize = 20; - -/// A Bucket represents a group of entries in the routing table. -#[derive(Debug, Clone)] -pub struct Bucket { - entries: Vec, -} - -impl Bucket { - /// Creates a new empty Bucket - pub fn new() -> Self { - Self { - entries: Vec::with_capacity(BUCKET_SIZE), - } - } - - /// Add an entry to the bucket. - pub fn add(&mut self, entry: &Entry) { - self.entries.push(BucketEntry { - status: PENDING_ENTRY, - entry: entry.clone(), - failures: 0, - last_seen: chrono::Utc::now().timestamp(), - }) - } - - /// Get the number of entries in the bucket. - pub fn len(&self) -> usize { - self.entries.len() - } - - /// Returns an iterator over the entries in the bucket. - pub fn iter(&self) -> impl Iterator { - self.entries.iter() - } - - /// Remove an entry. - pub fn remove(&mut self, key: &Key) { - let position = self.entries.iter().position(|e| &e.entry.key == key); - if let Some(i) = position { - self.entries.remove(i); - } - } - - /// Returns an iterator of entries in random order. - pub fn random_iter(&self, amount: usize) -> impl Iterator { - self.entries.choose_multiple(&mut OsRng, amount) - } - - /// Updates the status of an entry in the bucket identified by the given key. - /// - /// If the key is not found in the bucket, no action is taken. - /// - /// This will also update the last_seen field and increase the failures - /// counter for the bucket entry according to the new status. - pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { - if let Some(e) = self.entries.iter_mut().find(|e| &e.entry.key == key) { - e.status = entry_flag; - if e.is_unreachable() || e.is_unstable() { - e.failures += 1; - } - - if !e.is_unreachable() { - e.last_seen = chrono::Utc::now().timestamp(); - } - } - } - - /// Check if the bucket contains the given key. - pub fn contains_key(&self, key: &Key) -> bool { - self.entries.iter().any(|e| &e.entry.key == key) - } -} diff --git a/karyons_p2p/src/routing_table/entry.rs b/karyons_p2p/src/routing_table/entry.rs deleted file mode 100644 index b3f219f..0000000 --- a/karyons_p2p/src/routing_table/entry.rs +++ /dev/null @@ -1,41 +0,0 @@ -use bincode::{Decode, Encode}; - -use karyons_net::{Addr, Port}; - -/// Specifies the size of the key, in bytes. -pub const KEY_SIZE: usize = 32; - -/// An Entry represents a peer in the routing table. -#[derive(Encode, Decode, Clone, Debug)] -pub struct Entry { - /// The unique key identifying the peer. - pub key: Key, - /// The IP address of the peer. - pub addr: Addr, - /// TCP port - pub port: Port, - /// UDP/TCP port - pub discovery_port: Port, -} - -impl PartialEq for Entry { - fn eq(&self, other: &Self) -> bool { - // XXX this should also compare both addresses (the self.addr == other.addr) - self.key == other.key - } -} - -/// The unique key identifying the peer. -pub type Key = [u8; KEY_SIZE]; - -/// Calculates the XOR distance between two provided keys. -/// -/// The XOR distance is a metric used in Kademlia to measure the closeness -/// of keys. -pub fn xor_distance(key: &Key, other: &Key) -> Key { - let mut res = [0; 32]; - for (i, (k, o)) in key.iter().zip(other.iter()).enumerate() { - res[i] = k ^ o; - } - res -} diff --git a/karyons_p2p/src/routing_table/mod.rs b/karyons_p2p/src/routing_table/mod.rs deleted file mode 100644 index abf9a08..0000000 --- a/karyons_p2p/src/routing_table/mod.rs +++ /dev/null @@ -1,461 +0,0 @@ -mod bucket; -mod entry; -pub use bucket::{ - Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, - UNREACHABLE_ENTRY, UNSTABLE_ENTRY, -}; -pub use entry::{xor_distance, Entry, Key}; - -use rand::{rngs::OsRng, seq::SliceRandom}; - -use crate::utils::subnet_match; - -use bucket::BUCKET_SIZE; -use entry::KEY_SIZE; - -/// The total number of buckets in the routing table. -const TABLE_SIZE: usize = 32; - -/// The distance limit for the closest buckets. -const DISTANCE_LIMIT: usize = 32; - -/// The maximum number of matched subnets allowed within a single bucket. -const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1; - -/// The maximum number of matched subnets across the entire routing table. -const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6; - -/// Represents the possible result when adding a new entry. -#[derive(Debug)] -pub enum AddEntryResult { - /// The entry is added. - Added, - /// The entry is already exists. - Exists, - /// The entry is ignored. - Ignored, - /// The entry is restricted and not allowed. - Restricted, -} - -/// This is a modified version of the Kademlia Distributed Hash Table (DHT). -/// https://en.wikipedia.org/wiki/Kademlia -#[derive(Debug)] -pub struct RoutingTable { - key: Key, - buckets: Vec, -} - -impl RoutingTable { - /// Creates a new RoutingTable - pub fn new(key: Key) -> Self { - let buckets: Vec = (0..TABLE_SIZE).map(|_| Bucket::new()).collect(); - Self { key, buckets } - } - - /// Adds a new entry to the table and returns a result indicating success, - /// failure, or restrictions. - pub fn add_entry(&mut self, entry: Entry) -> AddEntryResult { - // Determine the index of the bucket where the entry should be placed. - let bucket_idx = match self.bucket_index(&entry.key) { - Some(i) => i, - None => return AddEntryResult::Ignored, - }; - - let bucket = &self.buckets[bucket_idx]; - - // Check if the entry already exists in the bucket. - if bucket.contains_key(&entry.key) { - return AddEntryResult::Exists; - } - - // Check if the entry is restricted. - if self.subnet_restricted(bucket_idx, &entry) { - return AddEntryResult::Restricted; - } - - let bucket = &mut self.buckets[bucket_idx]; - - // If the bucket has free space, add the entry and return success. - if bucket.len() < BUCKET_SIZE { - bucket.add(&entry); - return AddEntryResult::Added; - } - - // If the bucket is full, the entry is ignored. - AddEntryResult::Ignored - } - - /// Check if the table contains the given key. - pub fn contains_key(&self, key: &Key) -> bool { - // Determine the bucket index for the given key. - let bucket_idx = match self.bucket_index(key) { - Some(bi) => bi, - None => return false, - }; - - let bucket = &self.buckets[bucket_idx]; - bucket.contains_key(key) - } - - /// Updates the status of an entry in the routing table identified - /// by the given key. - /// - /// If the key is not found, no action is taken. - pub fn update_entry(&mut self, key: &Key, entry_flag: EntryStatusFlag) { - // Determine the bucket index for the given key. - let bucket_idx = match self.bucket_index(key) { - Some(bi) => bi, - None => return, - }; - - let bucket = &mut self.buckets[bucket_idx]; - bucket.update_entry(key, entry_flag); - } - - /// Returns a list of bucket indexes that are closest to the given target key. - pub fn bucket_indexes(&self, target_key: &Key) -> Vec { - let mut indexes = vec![]; - - // Determine the primary bucket index for the target key. - let bucket_idx = self.bucket_index(target_key).unwrap_or(0); - - indexes.push(bucket_idx); - - // Add additional bucket indexes within a certain distance limit. - for i in 1..DISTANCE_LIMIT { - if bucket_idx >= i && bucket_idx - i >= 1 { - indexes.push(bucket_idx - i); - } - - if bucket_idx + i < (TABLE_SIZE - 1) { - indexes.push(bucket_idx + i); - } - } - - indexes - } - - /// Returns a list of the closest entries to the given target key, limited by max_entries. - pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec { - let mut entries: Vec = vec![]; - - // Collect entries - 'outer: for idx in self.bucket_indexes(target_key) { - let bucket = &self.buckets[idx]; - for bucket_entry in bucket.iter() { - if bucket_entry.is_unreachable() || bucket_entry.is_unstable() { - continue; - } - - entries.push(bucket_entry.entry.clone()); - if entries.len() == max_entries { - break 'outer; - } - } - } - - // Sort the entries by their distance to the target key. - entries.sort_by(|a, b| { - xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key)) - }); - - entries - } - - /// Removes an entry with the given key from the routing table, if it exists. - pub fn remove_entry(&mut self, key: &Key) { - // Determine the bucket index for the given key. - let bucket_idx = match self.bucket_index(key) { - Some(bi) => bi, - None => return, - }; - - let bucket = &mut self.buckets[bucket_idx]; - bucket.remove(key); - } - - /// Returns an iterator of entries. - pub fn iter(&self) -> impl Iterator { - self.buckets.iter() - } - - /// Returns a random entry from the routing table. - pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<&Entry> { - for bucket in self.buckets.choose_multiple(&mut OsRng, self.buckets.len()) { - for entry in bucket.random_iter(bucket.len()) { - if entry.status & entry_flag == 0 { - continue; - } - return Some(&entry.entry); - } - } - - None - } - - // Returns the bucket index for a given key in the table. - fn bucket_index(&self, key: &Key) -> Option { - // Calculate the XOR distance between the self key and the provided key. - let distance = xor_distance(&self.key, key); - - for (i, b) in distance.iter().enumerate() { - if *b != 0 { - let lz = i * 8 + b.leading_zeros() as usize; - let bits = KEY_SIZE * 8 - 1; - let idx = (bits - lz) / 8; - return Some(idx); - } - } - None - } - - /// This function iterate through the routing table and counts how many - /// entries in the same subnet as the given Entry are already present. - /// - /// If the number of matching entries in the same bucket exceeds a - /// threshold (MAX_MATCHED_SUBNET_IN_BUCKET), or if the total count of - /// matching entries in the entire table exceeds a threshold - /// (MAX_MATCHED_SUBNET_IN_TABLE), the addition of the Entry - /// is considered restricted and returns true. - fn subnet_restricted(&self, idx: usize, entry: &Entry) -> bool { - let mut bucket_count = 0; - let mut table_count = 0; - - // Iterate through the routing table's buckets and entries to check - // for subnet matches. - for (i, bucket) in self.buckets.iter().enumerate() { - for e in bucket.iter() { - // If there is a subnet match, update the counts. - let matched = subnet_match(&e.entry.addr, &entry.addr); - if matched { - if i == idx { - bucket_count += 1; - } - table_count += 1; - } - - // If the number of matched entries in the same bucket exceeds - // the limit, return true - if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET { - return true; - } - } - - // If the total matched entries in the table exceed the limit, - // return true. - if table_count >= MAX_MATCHED_SUBNET_IN_TABLE { - return true; - } - } - - // If no subnet restrictions are encountered, return false. - false - } -} - -#[cfg(test)] -mod tests { - use super::bucket::ALL_ENTRY; - use super::*; - - use karyons_net::Addr; - - struct Setup { - local_key: Key, - keys: Vec, - } - - fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry { - Entry { - key: key.clone(), - addr: addr.clone(), - port, - discovery_port, - } - } - - impl Setup { - fn new() -> Self { - let keys = vec![ - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 1, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 1, 1, 0, 1, 1, 2, - ], - [ - 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 3, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0, - 0, 0, 0, 0, 0, 4, - ], - [ - 223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 5, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0, - 0, 0, 0, 0, 0, 6, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0, - 0, 0, 0, 0, 0, 0, 7, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0, - 0, 0, 0, 0, 0, 0, 8, - ], - [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0, - 0, 0, 0, 0, 0, 0, 9, - ], - ]; - - Self { - local_key: [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, - ], - keys, - } - } - - fn entries(&self) -> Vec { - let mut entries = vec![]; - for (i, key) in self.keys.iter().enumerate() { - entries.push(new_entry( - key, - &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()), - 3000, - 3010, - )); - } - entries - } - - fn table(&self) -> RoutingTable { - let mut table = RoutingTable::new(self.local_key.clone()); - - for entry in self.entries() { - let res = table.add_entry(entry); - assert!(matches!(res, AddEntryResult::Added)); - } - - table - } - } - - #[test] - fn test_bucket_index() { - let setup = Setup::new(); - let table = setup.table(); - - assert_eq!(table.bucket_index(&setup.local_key), None); - assert_eq!(table.bucket_index(&setup.keys[0]), Some(0)); - assert_eq!(table.bucket_index(&setup.keys[1]), Some(5)); - assert_eq!(table.bucket_index(&setup.keys[2]), Some(26)); - assert_eq!(table.bucket_index(&setup.keys[3]), Some(11)); - assert_eq!(table.bucket_index(&setup.keys[4]), Some(31)); - assert_eq!(table.bucket_index(&setup.keys[5]), Some(11)); - assert_eq!(table.bucket_index(&setup.keys[6]), Some(12)); - assert_eq!(table.bucket_index(&setup.keys[7]), Some(13)); - assert_eq!(table.bucket_index(&setup.keys[8]), Some(14)); - } - - #[test] - fn test_closest_entries() { - let setup = Setup::new(); - let table = setup.table(); - let entries = setup.entries(); - - assert_eq!( - table.closest_entries(&setup.keys[5], 8), - vec![ - entries[5].clone(), - entries[3].clone(), - entries[1].clone(), - entries[6].clone(), - entries[7].clone(), - entries[8].clone(), - entries[2].clone(), - ] - ); - - assert_eq!( - table.closest_entries(&setup.keys[4], 2), - vec![entries[4].clone(), entries[2].clone()] - ); - } - - #[test] - fn test_random_entry() { - let setup = Setup::new(); - let mut table = setup.table(); - let entries = setup.entries(); - - let entry = table.random_entry(ALL_ENTRY); - assert!(matches!(entry, Some(&_))); - - let entry = table.random_entry(CONNECTED_ENTRY); - assert!(matches!(entry, None)); - - for entry in entries { - table.remove_entry(&entry.key); - } - - let entry = table.random_entry(ALL_ENTRY); - assert!(matches!(entry, None)); - } - - #[test] - fn test_add_entries() { - let setup = Setup::new(); - let mut table = setup.table(); - - let key = [ - 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 5, - ]; - - let key2 = [ - 0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 5, - ]; - - let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010); - assert!(matches!( - table.add_entry(entry1.clone()), - AddEntryResult::Added - )); - - assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists)); - - let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010); - assert!(matches!( - table.add_entry(entry2), - AddEntryResult::Restricted - )); - - let mut key: [u8; 32] = [0; 32]; - - for i in 0..BUCKET_SIZE { - key[i] += 1; - let entry = new_entry( - &key, - &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()), - 3000, - 3010, - ); - table.add_entry(entry); - } - - key[BUCKET_SIZE] += 1; - let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010); - assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored)); - } -} diff --git a/karyons_p2p/src/utils/mod.rs b/karyons_p2p/src/utils/mod.rs deleted file mode 100644 index e8ff9d0..0000000 --- a/karyons_p2p/src/utils/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod version; - -pub use version::{version_match, Version, VersionInt}; - -use std::net::IpAddr; - -use karyons_net::Addr; - -/// Check if two addresses belong to the same subnet. -pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { - match (addr, other_addr) { - (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => { - // XXX Consider moving this to a different location - if other_ip.is_loopback() && ip.is_loopback() { - return false; - } - ip.octets()[0..3] == other_ip.octets()[0..3] - } - _ => false, - } -} diff --git a/karyons_p2p/src/utils/version.rs b/karyons_p2p/src/utils/version.rs deleted file mode 100644 index 4986495..0000000 --- a/karyons_p2p/src/utils/version.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::str::FromStr; - -use bincode::{Decode, Encode}; -use semver::VersionReq; - -use crate::{Error, Result}; - -/// Represents the network version and protocol version used in Karyons p2p. -/// -/// # Example -/// -/// ``` -/// use karyons_p2p::Version; -/// -/// let version: Version = "0.2.0, >0.1.0".parse().unwrap(); -/// -/// let version: Version = "0.2.0".parse().unwrap(); -/// -/// ``` -#[derive(Debug, Clone)] -pub struct Version { - pub v: VersionInt, - pub req: VersionReq, -} - -impl Version { - /// Creates a new Version - pub fn new(v: VersionInt, req: VersionReq) -> Self { - Self { v, req } - } -} - -#[derive(Debug, Decode, Encode, Clone)] -pub struct VersionInt { - major: u64, - minor: u64, - patch: u64, -} - -impl FromStr for Version { - type Err = Error; - - fn from_str(s: &str) -> Result { - let v: Vec<&str> = s.split(", ").collect(); - if v.is_empty() || v.len() > 2 { - return Err(Error::ParseError(format!("Invalid version{s}"))); - } - - let version: VersionInt = v[0].parse()?; - let req: VersionReq = if v.len() > 1 { v[1] } else { v[0] }.parse()?; - - Ok(Self { v: version, req }) - } -} - -impl FromStr for VersionInt { - type Err = Error; - - fn from_str(s: &str) -> Result { - let v: Vec<&str> = s.split('.').collect(); - if v.len() < 2 || v.len() > 3 { - return Err(Error::ParseError(format!("Invalid version{s}"))); - } - - let major = v[0].parse::()?; - let minor = v[1].parse::()?; - let patch = v.get(2).unwrap_or(&"0").parse::()?; - - Ok(Self { - major, - minor, - patch, - }) - } -} - -impl std::fmt::Display for VersionInt { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}.{}.{}", self.major, self.minor, self.patch) - } -} - -impl From for semver::Version { - fn from(v: VersionInt) -> Self { - semver::Version::new(v.major, v.minor, v.patch) - } -} - -/// Check if a version satisfies a version request. -pub fn version_match(version_req: &VersionReq, version: &VersionInt) -> bool { - let version: semver::Version = version.clone().into(); - version_req.matches(&version) -} -- cgit v1.2.3