From 98a1de91a2dae06323558422c239e5a45fc86e7b Mon Sep 17 00:00:00 2001 From: hozan23 Date: Tue, 28 Nov 2023 22:41:33 +0300 Subject: implement TLS for inbound and outbound connections --- p2p/src/peer_pool.rs | 46 +++++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 25 deletions(-) (limited to 'p2p/src/peer_pool.rs') diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index a0079f2..dd7e669 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -11,23 +11,23 @@ use smol::{ }; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - utils::decode, + async_util::{TaskGroup, TaskResult}, + util::decode, GlobalExecutor, }; use karyons_net::Conn; use crate::{ + codec::{Codec, CodecMsg}, config::Config, connection::{ConnDirection, ConnQueue}, - io_codec::{CodecMsg, IOCodec}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, monitor::{Monitor, PeerPoolEvent}, peer::{ArcPeer, Peer, PeerID}, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, - utils::{version_match, Version, VersionInt}, + version::{version_match, Version, VersionInt}, Error, Result, }; @@ -155,10 +155,10 @@ impl PeerPool { disconnect_signal: Sender>, ) -> Result<()> { let endpoint = conn.peer_endpoint()?; - let io_codec = IOCodec::new(conn); + let codec = Codec::new(conn); // Do a handshake with the connection before creating a new peer. - let pid = self.do_handshake(&io_codec, conn_direction).await?; + let pid = self.do_handshake(&codec, conn_direction).await?; // TODO: Consider restricting the subnet for inbound connections if self.contains_peer(&pid).await { @@ -169,7 +169,7 @@ impl PeerPool { let peer = Peer::new( Arc::downgrade(self), &pid, - io_codec, + codec, endpoint.clone(), conn_direction.clone(), self.executor.clone(), @@ -235,20 +235,16 @@ impl PeerPool { } /// Initiate a handshake with a connection. - async fn do_handshake( - &self, - io_codec: &IOCodec, - conn_direction: &ConnDirection, - ) -> Result { + async fn do_handshake(&self, codec: &Codec, conn_direction: &ConnDirection) -> Result { match conn_direction { ConnDirection::Inbound => { - let result = self.wait_vermsg(io_codec).await; + let result = self.wait_vermsg(codec).await; match result { Ok(_) => { - self.send_verack(io_codec, true).await?; + self.send_verack(codec, true).await?; } Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => { - self.send_verack(io_codec, false).await?; + self.send_verack(codec, false).await?; } _ => {} } @@ -256,14 +252,14 @@ impl PeerPool { } ConnDirection::Outbound => { - self.send_vermsg(io_codec).await?; - self.wait_verack(io_codec).await + self.send_vermsg(codec).await?; + self.wait_verack(codec).await } } } /// Send a Version message - async fn send_vermsg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_vermsg(&self, codec: &Codec) -> Result<()> { let pids = self.protocol_versions.read().await; let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect(); drop(pids); @@ -275,16 +271,16 @@ impl PeerPool { }; trace!("Send VerMsg"); - io_codec.write(NetMsgCmd::Version, &vermsg).await?; + codec.write(NetMsgCmd::Version, &vermsg).await?; Ok(()) } /// Wait for a Version message /// /// Returns the peer's ID upon successfully receiving the Version message. - async fn wait_vermsg(&self, io_codec: &IOCodec) -> Result { + async fn wait_vermsg(&self, codec: &Codec) -> Result { let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; + let msg: NetMsg = codec.read_timeout(timeout).await?; let payload = get_msg_payload!(Version, msg); let (vermsg, _) = decode::(&payload)?; @@ -300,23 +296,23 @@ impl PeerPool { } /// Send a Verack message - async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> { + async fn send_verack(&self, codec: &Codec, ack: bool) -> Result<()> { let verack = VerAckMsg { peer_id: self.id.clone(), ack, }; trace!("Send VerAckMsg {:?}", verack); - io_codec.write(NetMsgCmd::Verack, &verack).await?; + codec.write(NetMsgCmd::Verack, &verack).await?; Ok(()) } /// Wait for a Verack message /// /// Returns the peer's ID upon successfully receiving the Verack message. - async fn wait_verack(&self, io_codec: &IOCodec) -> Result { + async fn wait_verack(&self, codec: &Codec) -> Result { let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; + let msg: NetMsg = codec.read_timeout(timeout).await?; let payload = get_msg_payload!(Verack, msg); let (verack, _) = decode::(&payload)?; -- cgit v1.2.3