From 98a1de91a2dae06323558422c239e5a45fc86e7b Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 Nov 2023 22:41:33 +0300 Subject: implement TLS for inbound and outbound connections --- p2p/src/discovery/lookup.rs | 71 ++++++++++++++++++++++++++++++-------------- p2p/src/discovery/mod.rs | 30 ++++++++++++++----- p2p/src/discovery/refresh.rs | 4 +-- 3 files changed, 73 insertions(+), 32 deletions(-) (limited to 'p2p/src/discovery') diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 0138068..60d8635 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,13 +5,13 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use smol::lock::{Mutex, RwLock}; -use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor}; +use karyons_core::{async_util::timeout, key_pair::KeyPair, util::decode, GlobalExecutor}; use karyons_net::{Conn, Endpoint}; use crate::{ + codec::Codec, connector::Connector, - io_codec::IOCodec, listener::Listener, message::{ get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, @@ -20,7 +20,7 @@ use crate::{ monitor::{ConnEvent, DiscoveryEvent, Monitor}, routing_table::RoutingTable, slots::ConnectionSlots, - utils::version_match, + version::version_match, Config, Error, PeerID, Result, }; @@ -55,6 +55,7 @@ pub struct LookupService { impl LookupService { /// Creates a new lookup service pub fn new( + key_pair: &KeyPair, id: &PeerID, table: Arc>, config: Arc, @@ -64,11 +65,19 @@ impl LookupService { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); + let listener = Listener::new( + key_pair, + inbound_slots.clone(), + config.enable_tls, + monitor.clone(), + ex.clone(), + ); let connector = Connector::new( + key_pair, config.lookup_connect_retries, outbound_slots.clone(), + config.enable_tls, monitor.clone(), ex, ); @@ -116,14 +125,17 @@ impl LookupService { /// randomly generated peer ID. Upon receiving peers from the initial lookup, /// it starts connecting to these received peers and sends them a FindPeer /// message that contains our own peer ID. - pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { + pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option) -> Result<()> { trace!("Lookup started {endpoint}"); self.monitor .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) .await; let mut random_peers = vec![]; - if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { + if let Err(err) = self + .random_lookup(endpoint, peer_id, &mut random_peers) + .await + { self.monitor .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) .await; @@ -160,11 +172,14 @@ impl LookupService { async fn random_lookup( &self, endpoint: &Endpoint, + peer_id: Option, random_peers: &mut Vec, ) -> Result<()> { for _ in 0..2 { - let peer_id = PeerID::random(); - let peers = self.connect(&peer_id, endpoint.clone()).await?; + let random_peer_id = PeerID::random(); + let peers = self + .connect(endpoint.clone(), peer_id.clone(), &random_peer_id) + .await?; let table = self.table.lock().await; for peer in peers { @@ -187,7 +202,7 @@ impl LookupService { let mut tasks = FuturesUnordered::new(); for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); - tasks.push(self.connect(&self.id, endpoint)) + tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id)) } while let Some(result) = tasks.next().await { @@ -200,11 +215,17 @@ impl LookupService { } } - /// Connects to the given endpoint - async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result> { - let conn = self.connector.connect(&endpoint).await?; - let io_codec = IOCodec::new(conn); - let result = self.handle_outbound(io_codec, peer_id).await; + /// Connects to the given endpoint and initiates a lookup process for the + /// provided peer ID. + async fn connect( + &self, + endpoint: Endpoint, + peer_id: Option, + target_peer_id: &PeerID, + ) -> Result> { + let conn = self.connector.connect(&endpoint, &peer_id).await?; + let io_codec = Codec::new(conn); + let result = self.handle_outbound(io_codec, target_peer_id).await; self.monitor .notify(&ConnEvent::Disconnected(endpoint).into()) @@ -215,12 +236,16 @@ impl LookupService { } /// Handles outbound connection - async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result> { + async fn handle_outbound( + &self, + io_codec: Codec, + target_peer_id: &PeerID, + ) -> Result> { trace!("Send Ping msg"); self.send_ping_msg(&io_codec).await?; trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; + let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?; if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { return Err(Error::Lookup("Received too many peers in PeersMsg")); @@ -260,7 +285,7 @@ impl LookupService { /// Handles inbound connection async fn handle_inbound(self: &Arc, conn: Conn) -> Result<()> { - let io_codec = IOCodec::new(conn); + let io_codec = Codec::new(conn); loop { let msg: NetMsg = io_codec.read().await?; trace!("Receive msg {:?}", msg.header.command); @@ -293,7 +318,7 @@ impl LookupService { } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> { trace!("Send Pong msg"); let mut nonce: [u8; 32] = [0; 32]; @@ -319,14 +344,14 @@ impl LookupService { } /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { + async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> { trace!("Send Pong msg"); io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; Ok(()) } /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result { + async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result { trace!("Send FindPeer msg"); io_codec .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) @@ -342,7 +367,7 @@ impl LookupService { } /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { + async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> { trace!("Send Peers msg"); let table = self.table.lock().await; let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); @@ -354,7 +379,7 @@ impl LookupService { } /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { + async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> { trace!("Send Peer msg"); let peer_msg = PeerMsg { addr: endpoint.addr()?.clone(), @@ -367,7 +392,7 @@ impl LookupService { } /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> { trace!("Send Shutdown msg"); io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; Ok(()) diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7f55309..2c1bcd8 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom}; use smol::lock::Mutex; use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; @@ -66,6 +67,7 @@ pub struct Discovery { impl Discovery { /// Creates a new Discovery pub fn new( + key_pair: &KeyPair, peer_id: &PeerID, conn_queue: Arc, config: Arc, @@ -81,6 +83,7 @@ impl Discovery { let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); let lookup_service = LookupService::new( + key_pair, peer_id, table.clone(), config.clone(), @@ -89,12 +92,21 @@ impl Discovery { ); let connector = Connector::new( + key_pair, config.max_connect_retries, outbound_slots.clone(), + config.enable_tls, + monitor.clone(), + ex.clone(), + ); + + let listener = Listener::new( + key_pair, + inbound_slots.clone(), + config.enable_tls, monitor.clone(), ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -222,7 +234,7 @@ impl Discovery { selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; } Err(Error::PeerAlreadyConnected) => { - // TODO + // TODO: Use the appropriate status. selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; } Err(_) => { @@ -236,10 +248,13 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(endpoint, cback).await; + let result = self + .connector + .connect_with_cback(endpoint, &pid, cback) + .await; if let Some(pid) = &pid { - match res { + match result { Ok(_) => { self.update_entry(pid, CONNECTED_ENTRY).await; } @@ -260,7 +275,8 @@ impl Discovery { match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); - if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { + let peer_id = Some(entry.key.into()); + if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; error!("Failed to do lookup: {endpoint}: {err}"); } @@ -268,7 +284,7 @@ impl Discovery { None => { let peers = &self.config.bootstrap_peers; for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { - if let Err(err) = self.lookup_service.start_lookup(endpoint).await { + if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await { error!("Failed to do lookup: {endpoint}: {err}"); } } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index d095f19..f797c71 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -10,8 +10,8 @@ use smol::{ }; use karyons_core::{ - async_utils::{timeout, Backoff, TaskGroup, TaskResult}, - utils::{decode, encode}, + async_util::{timeout, Backoff, TaskGroup, TaskResult}, + util::{decode, encode}, GlobalExecutor, }; -- cgit v1.2.3