aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/discovery
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-28 22:41:33 +0300
committerhozan23 <hozan23@proton.me>2023-11-28 22:41:33 +0300
commit98a1de91a2dae06323558422c239e5a45fc86e7b (patch)
tree38c640248824fcb3b4ca5ba12df47c13ef26ccda /p2p/src/discovery
parentca2a5f8bbb6983d9555abd10eaaf86950b794957 (diff)
implement TLS for inbound and outbound connections
Diffstat (limited to 'p2p/src/discovery')
-rw-r--r--p2p/src/discovery/lookup.rs71
-rw-r--r--p2p/src/discovery/mod.rs30
-rw-r--r--p2p/src/discovery/refresh.rs4
3 files changed, 73 insertions, 32 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 0138068..60d8635 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,13 +5,13 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor};
+use karyons_core::{async_util::timeout, key_pair::KeyPair, util::decode, GlobalExecutor};
use karyons_net::{Conn, Endpoint};
use crate::{
+ codec::Codec,
connector::Connector,
- io_codec::IOCodec,
listener::Listener,
message::{
get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg,
@@ -20,7 +20,7 @@ use crate::{
monitor::{ConnEvent, DiscoveryEvent, Monitor},
routing_table::RoutingTable,
slots::ConnectionSlots,
- utils::version_match,
+ version::version_match,
Config, Error, PeerID, Result,
};
@@ -55,6 +55,7 @@ pub struct LookupService {
impl LookupService {
/// Creates a new lookup service
pub fn new(
+ key_pair: &KeyPair,
id: &PeerID,
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
@@ -64,11 +65,19 @@ impl LookupService {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
- let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
+ let listener = Listener::new(
+ key_pair,
+ inbound_slots.clone(),
+ config.enable_tls,
+ monitor.clone(),
+ ex.clone(),
+ );
let connector = Connector::new(
+ key_pair,
config.lookup_connect_retries,
outbound_slots.clone(),
+ config.enable_tls,
monitor.clone(),
ex,
);
@@ -116,14 +125,17 @@ impl LookupService {
/// randomly generated peer ID. Upon receiving peers from the initial lookup,
/// it starts connecting to these received peers and sends them a FindPeer
/// message that contains our own peer ID.
- pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> {
+ pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
trace!("Lookup started {endpoint}");
self.monitor
.notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into())
.await;
let mut random_peers = vec![];
- if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await {
+ if let Err(err) = self
+ .random_lookup(endpoint, peer_id, &mut random_peers)
+ .await
+ {
self.monitor
.notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into())
.await;
@@ -160,11 +172,14 @@ impl LookupService {
async fn random_lookup(
&self,
endpoint: &Endpoint,
+ peer_id: Option<PeerID>,
random_peers: &mut Vec<PeerMsg>,
) -> Result<()> {
for _ in 0..2 {
- let peer_id = PeerID::random();
- let peers = self.connect(&peer_id, endpoint.clone()).await?;
+ let random_peer_id = PeerID::random();
+ let peers = self
+ .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
+ .await?;
let table = self.table.lock().await;
for peer in peers {
@@ -187,7 +202,7 @@ impl LookupService {
let mut tasks = FuturesUnordered::new();
for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
- tasks.push(self.connect(&self.id, endpoint))
+ tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
}
while let Some(result) = tasks.next().await {
@@ -200,11 +215,17 @@ impl LookupService {
}
}
- /// Connects to the given endpoint
- async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> {
- let conn = self.connector.connect(&endpoint).await?;
- let io_codec = IOCodec::new(conn);
- let result = self.handle_outbound(io_codec, peer_id).await;
+ /// Connects to the given endpoint and initiates a lookup process for the
+ /// provided peer ID.
+ async fn connect(
+ &self,
+ endpoint: Endpoint,
+ peer_id: Option<PeerID>,
+ target_peer_id: &PeerID,
+ ) -> Result<Vec<PeerMsg>> {
+ let conn = self.connector.connect(&endpoint, &peer_id).await?;
+ let io_codec = Codec::new(conn);
+ let result = self.handle_outbound(io_codec, target_peer_id).await;
self.monitor
.notify(&ConnEvent::Disconnected(endpoint).into())
@@ -215,12 +236,16 @@ impl LookupService {
}
/// Handles outbound connection
- async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> {
+ async fn handle_outbound(
+ &self,
+ io_codec: Codec,
+ target_peer_id: &PeerID,
+ ) -> Result<Vec<PeerMsg>> {
trace!("Send Ping msg");
self.send_ping_msg(&io_codec).await?;
trace!("Send FindPeer msg");
- let peers = self.send_findpeer_msg(&io_codec, peer_id).await?;
+ let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?;
if peers.0.len() >= MAX_PEERS_IN_PEERSMSG {
return Err(Error::Lookup("Received too many peers in PeersMsg"));
@@ -260,7 +285,7 @@ impl LookupService {
/// Handles inbound connection
async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> {
- let io_codec = IOCodec::new(conn);
+ let io_codec = Codec::new(conn);
loop {
let msg: NetMsg = io_codec.read().await?;
trace!("Receive msg {:?}", msg.header.command);
@@ -293,7 +318,7 @@ impl LookupService {
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> {
trace!("Send Pong msg");
let mut nonce: [u8; 32] = [0; 32];
@@ -319,14 +344,14 @@ impl LookupService {
}
/// Sends a Pong msg
- async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> {
+ async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> {
trace!("Send Pong msg");
io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?;
Ok(())
}
/// Sends a FindPeer msg and wait to receivet the Peers msg.
- async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> {
+ async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> {
trace!("Send FindPeer msg");
io_codec
.write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone()))
@@ -342,7 +367,7 @@ impl LookupService {
}
/// Sends a Peers msg.
- async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> {
+ async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> {
trace!("Send Peers msg");
let table = self.table.lock().await;
let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
@@ -354,7 +379,7 @@ impl LookupService {
}
/// Sends a Peer msg.
- async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> {
+ async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> {
trace!("Send Peer msg");
let peer_msg = PeerMsg {
addr: endpoint.addr()?.clone(),
@@ -367,7 +392,7 @@ impl LookupService {
}
/// Sends a Shutdown msg.
- async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> {
trace!("Send Shutdown msg");
io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?;
Ok(())
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7f55309..2c1bcd8 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use smol::lock::Mutex;
use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
@@ -66,6 +67,7 @@ pub struct Discovery {
impl Discovery {
/// Creates a new Discovery
pub fn new(
+ key_pair: &KeyPair,
peer_id: &PeerID,
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
@@ -81,6 +83,7 @@ impl Discovery {
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
let lookup_service = LookupService::new(
+ key_pair,
peer_id,
table.clone(),
config.clone(),
@@ -89,12 +92,21 @@ impl Discovery {
);
let connector = Connector::new(
+ key_pair,
config.max_connect_retries,
outbound_slots.clone(),
+ config.enable_tls,
+ monitor.clone(),
+ ex.clone(),
+ );
+
+ let listener = Listener::new(
+ key_pair,
+ inbound_slots.clone(),
+ config.enable_tls,
monitor.clone(),
ex.clone(),
);
- let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
Arc::new(Self {
refresh_service: Arc::new(refresh_service),
@@ -222,7 +234,7 @@ impl Discovery {
selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
- // TODO
+ // TODO: Use the appropriate status.
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
@@ -236,10 +248,13 @@ impl Discovery {
Ok(())
};
- let res = self.connector.connect_with_cback(endpoint, cback).await;
+ let result = self
+ .connector
+ .connect_with_cback(endpoint, &pid, cback)
+ .await;
if let Some(pid) = &pid {
- match res {
+ match result {
Ok(_) => {
self.update_entry(pid, CONNECTED_ENTRY).await;
}
@@ -260,7 +275,8 @@ impl Discovery {
match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
- if let Err(err) = self.lookup_service.start_lookup(&endpoint).await {
+ let peer_id = Some(entry.key.into());
+ if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
error!("Failed to do lookup: {endpoint}: {err}");
}
@@ -268,7 +284,7 @@ impl Discovery {
None => {
let peers = &self.config.bootstrap_peers;
for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
- if let Err(err) = self.lookup_service.start_lookup(endpoint).await {
+ if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
error!("Failed to do lookup: {endpoint}: {err}");
}
}
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index d095f19..f797c71 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -10,8 +10,8 @@ use smol::{
};
use karyons_core::{
- async_utils::{timeout, Backoff, TaskGroup, TaskResult},
- utils::{decode, encode},
+ async_util::{timeout, Backoff, TaskGroup, TaskResult},
+ util::{decode, encode},
GlobalExecutor,
};