aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer_pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/peer_pool.rs')
-rw-r--r--p2p/src/peer_pool.rs46
1 files changed, 21 insertions, 25 deletions
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index a0079f2..dd7e669 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -11,23 +11,23 @@ use smol::{
};
use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
- utils::decode,
+ async_util::{TaskGroup, TaskResult},
+ util::decode,
GlobalExecutor,
};
use karyons_net::Conn;
use crate::{
+ codec::{Codec, CodecMsg},
config::Config,
connection::{ConnDirection, ConnQueue},
- io_codec::{CodecMsg, IOCodec},
message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
monitor::{Monitor, PeerPoolEvent},
peer::{ArcPeer, Peer, PeerID},
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
- utils::{version_match, Version, VersionInt},
+ version::{version_match, Version, VersionInt},
Error, Result,
};
@@ -155,10 +155,10 @@ impl PeerPool {
disconnect_signal: Sender<Result<()>>,
) -> Result<()> {
let endpoint = conn.peer_endpoint()?;
- let io_codec = IOCodec::new(conn);
+ let codec = Codec::new(conn);
// Do a handshake with the connection before creating a new peer.
- let pid = self.do_handshake(&io_codec, conn_direction).await?;
+ let pid = self.do_handshake(&codec, conn_direction).await?;
// TODO: Consider restricting the subnet for inbound connections
if self.contains_peer(&pid).await {
@@ -169,7 +169,7 @@ impl PeerPool {
let peer = Peer::new(
Arc::downgrade(self),
&pid,
- io_codec,
+ codec,
endpoint.clone(),
conn_direction.clone(),
self.executor.clone(),
@@ -235,20 +235,16 @@ impl PeerPool {
}
/// Initiate a handshake with a connection.
- async fn do_handshake(
- &self,
- io_codec: &IOCodec,
- conn_direction: &ConnDirection,
- ) -> Result<PeerID> {
+ async fn do_handshake(&self, codec: &Codec, conn_direction: &ConnDirection) -> Result<PeerID> {
match conn_direction {
ConnDirection::Inbound => {
- let result = self.wait_vermsg(io_codec).await;
+ let result = self.wait_vermsg(codec).await;
match result {
Ok(_) => {
- self.send_verack(io_codec, true).await?;
+ self.send_verack(codec, true).await?;
}
Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
- self.send_verack(io_codec, false).await?;
+ self.send_verack(codec, false).await?;
}
_ => {}
}
@@ -256,14 +252,14 @@ impl PeerPool {
}
ConnDirection::Outbound => {
- self.send_vermsg(io_codec).await?;
- self.wait_verack(io_codec).await
+ self.send_vermsg(codec).await?;
+ self.wait_verack(codec).await
}
}
}
/// Send a Version message
- async fn send_vermsg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_vermsg(&self, codec: &Codec) -> Result<()> {
let pids = self.protocol_versions.read().await;
let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect();
drop(pids);
@@ -275,16 +271,16 @@ impl PeerPool {
};
trace!("Send VerMsg");
- io_codec.write(NetMsgCmd::Version, &vermsg).await?;
+ codec.write(NetMsgCmd::Version, &vermsg).await?;
Ok(())
}
/// Wait for a Version message
///
/// Returns the peer's ID upon successfully receiving the Version message.
- async fn wait_vermsg(&self, io_codec: &IOCodec) -> Result<PeerID> {
+ async fn wait_vermsg(&self, codec: &Codec) -> Result<PeerID> {
let timeout = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = io_codec.read_timeout(timeout).await?;
+ let msg: NetMsg = codec.read_timeout(timeout).await?;
let payload = get_msg_payload!(Version, msg);
let (vermsg, _) = decode::<VerMsg>(&payload)?;
@@ -300,23 +296,23 @@ impl PeerPool {
}
/// Send a Verack message
- async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> {
+ async fn send_verack(&self, codec: &Codec, ack: bool) -> Result<()> {
let verack = VerAckMsg {
peer_id: self.id.clone(),
ack,
};
trace!("Send VerAckMsg {:?}", verack);
- io_codec.write(NetMsgCmd::Verack, &verack).await?;
+ codec.write(NetMsgCmd::Verack, &verack).await?;
Ok(())
}
/// Wait for a Verack message
///
/// Returns the peer's ID upon successfully receiving the Verack message.
- async fn wait_verack(&self, io_codec: &IOCodec) -> Result<PeerID> {
+ async fn wait_verack(&self, codec: &Codec) -> Result<PeerID> {
let timeout = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = io_codec.read_timeout(timeout).await?;
+ let msg: NetMsg = codec.read_timeout(timeout).await?;
let payload = get_msg_payload!(Verack, msg);
let (verack, _) = decode::<VerAckMsg>(&payload)?;