aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/peer/mod.rs')
-rw-r--r--p2p/src/peer/mod.rs42
1 files changed, 22 insertions, 20 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index ca68530..f0f6f17 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -4,24 +4,22 @@ pub use peer_id::PeerID;
use std::sync::Arc;
+use async_channel::{Receiver, Sender};
+use bincode::{Decode, Encode};
use log::{error, trace};
-use smol::{
- channel::{self, Receiver, Sender},
- lock::RwLock,
-};
use karyon_core::{
- async_util::{select, Either, Executor, TaskGroup, TaskResult},
+ async_runtime::{lock::RwLock, Executor},
+ async_util::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
util::{decode, encode},
};
-use karyon_net::Endpoint;
+use karyon_net::{Conn, Endpoint};
use crate::{
- codec::{Codec, CodecMsg},
connection::ConnDirection,
- message::{NetMsgCmd, ProtocolMsg, ShutdownMsg},
+ message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
peer_pool::{ArcPeerPool, WeakPeerPool},
protocol::{Protocol, ProtocolEvent, ProtocolID},
Config, Error, Result,
@@ -36,8 +34,8 @@ pub struct Peer {
/// A weak pointer to `PeerPool`
peer_pool: WeakPeerPool,
- /// Holds the Codec for the peer connection
- codec: Codec,
+ /// Holds the peer connection
+ conn: Conn<NetMsg>,
/// Remote endpoint for the peer
remote_endpoint: Endpoint,
@@ -55,7 +53,7 @@ pub struct Peer {
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
/// Managing spawned tasks.
- task_group: TaskGroup<'static>,
+ task_group: TaskGroup,
}
impl Peer {
@@ -63,21 +61,21 @@ impl Peer {
pub fn new(
peer_pool: WeakPeerPool,
id: &PeerID,
- codec: Codec,
+ conn: Conn<NetMsg>,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
- ex: Executor<'static>,
+ ex: Executor,
) -> ArcPeer {
Arc::new(Peer {
id: id.clone(),
peer_pool,
- codec,
+ conn,
protocol_ids: RwLock::new(Vec::new()),
remote_endpoint,
conn_direction,
protocol_events: EventSys::new(),
task_group: TaskGroup::with_executor(ex),
- stop_chan: channel::bounded(1),
+ stop_chan: async_channel::bounded(1),
})
}
@@ -88,7 +86,7 @@ impl Peer {
}
/// Send a message to the peer connection using the specified protocol.
- pub async fn send<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {
+ pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {
let payload = encode(msg)?;
let proto_msg = ProtocolMsg {
@@ -96,12 +94,14 @@ impl Peer {
payload: payload.to_vec(),
};
- self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
+ self.conn
+ .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?)
+ .await?;
Ok(())
}
/// Broadcast a message to all connected peers using the specified protocol.
- pub async fn broadcast<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) {
+ pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) {
self.peer_pool().broadcast(protocol_id, msg).await;
}
@@ -123,7 +123,9 @@ impl Peer {
let _ = self.stop_chan.0.try_send(Ok(()));
// No need to handle the error here
- let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await;
+ let shutdown_msg =
+ NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0)).expect("pack shutdown message");
+ let _ = self.conn.send(shutdown_msg).await;
// Force shutting down
self.task_group.cancel().await;
@@ -170,7 +172,7 @@ impl Peer {
/// Start a read loop to handle incoming messages from the peer connection.
async fn read_loop(&self) -> Result<()> {
loop {
- let fut = select(self.stop_chan.1.recv(), self.codec.read()).await;
+ let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await;
let result = match fut {
Either::Left(stop_signal) => {
trace!("Peer {} received a stop signal", self.id);