aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/peer/mod.rs')
-rw-r--r--p2p/src/peer/mod.rs23
1 files changed, 10 insertions, 13 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 85cd558..6ed0dd8 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -11,17 +11,17 @@ use smol::{
};
use karyons_core::{
- async_utils::{select, Either, TaskGroup, TaskResult},
+ async_util::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
- utils::{decode, encode},
+ util::{decode, encode},
GlobalExecutor,
};
use karyons_net::Endpoint;
use crate::{
+ codec::{Codec, CodecMsg},
connection::ConnDirection,
- io_codec::{CodecMsg, IOCodec},
message::{NetMsgCmd, ProtocolMsg, ShutdownMsg},
peer_pool::{ArcPeerPool, WeakPeerPool},
protocol::{Protocol, ProtocolEvent, ProtocolID},
@@ -37,8 +37,8 @@ pub struct Peer {
/// A weak pointer to `PeerPool`
peer_pool: WeakPeerPool,
- /// Holds the IOCodec for the peer connection
- io_codec: IOCodec,
+ /// Holds the Codec for the peer connection
+ codec: Codec,
/// Remote endpoint for the peer
remote_endpoint: Endpoint,
@@ -64,7 +64,7 @@ impl Peer {
pub fn new(
peer_pool: WeakPeerPool,
id: &PeerID,
- io_codec: IOCodec,
+ codec: Codec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
ex: GlobalExecutor,
@@ -72,7 +72,7 @@ impl Peer {
Arc::new(Peer {
id: id.clone(),
peer_pool,
- io_codec,
+ codec,
protocol_ids: RwLock::new(Vec::new()),
remote_endpoint,
conn_direction,
@@ -97,7 +97,7 @@ impl Peer {
payload: payload.to_vec(),
};
- self.io_codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
+ self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
Ok(())
}
@@ -124,10 +124,7 @@ impl Peer {
let _ = self.stop_chan.0.try_send(Ok(()));
// No need to handle the error here
- let _ = self
- .io_codec
- .write(NetMsgCmd::Shutdown, &ShutdownMsg(0))
- .await;
+ let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await;
// Force shutting down
self.task_group.cancel().await;
@@ -174,7 +171,7 @@ impl Peer {
/// Start a read loop to handle incoming messages from the peer connection.
async fn read_loop(&self) -> Result<()> {
loop {
- let fut = select(self.stop_chan.1.recv(), self.io_codec.read()).await;
+ let fut = select(self.stop_chan.1.recv(), self.codec.read()).await;
let result = match fut {
Either::Left(stop_signal) => {
trace!("Peer {} received a stop signal", self.id);