diff options
Diffstat (limited to 'p2p/src/peer/mod.rs')
-rw-r--r-- | p2p/src/peer/mod.rs | 42 |
1 files changed, 22 insertions, 20 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index ca68530..f0f6f17 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -4,24 +4,22 @@ pub use peer_id::PeerID; use std::sync::Arc; +use async_channel::{Receiver, Sender}; +use bincode::{Decode, Encode}; use log::{error, trace}; -use smol::{ - channel::{self, Receiver, Sender}, - lock::RwLock, -}; use karyon_core::{ - async_util::{select, Either, Executor, TaskGroup, TaskResult}, + async_runtime::{lock::RwLock, Executor}, + async_util::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, util::{decode, encode}, }; -use karyon_net::Endpoint; +use karyon_net::{Conn, Endpoint}; use crate::{ - codec::{Codec, CodecMsg}, connection::ConnDirection, - message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, + message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg}, peer_pool::{ArcPeerPool, WeakPeerPool}, protocol::{Protocol, ProtocolEvent, ProtocolID}, Config, Error, Result, @@ -36,8 +34,8 @@ pub struct Peer { /// A weak pointer to `PeerPool` peer_pool: WeakPeerPool, - /// Holds the Codec for the peer connection - codec: Codec, + /// Holds the peer connection + conn: Conn<NetMsg>, /// Remote endpoint for the peer remote_endpoint: Endpoint, @@ -55,7 +53,7 @@ pub struct Peer { stop_chan: (Sender<Result<()>>, Receiver<Result<()>>), /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, } impl Peer { @@ -63,21 +61,21 @@ impl Peer { pub fn new( peer_pool: WeakPeerPool, id: &PeerID, - codec: Codec, + conn: Conn<NetMsg>, remote_endpoint: Endpoint, conn_direction: ConnDirection, - ex: Executor<'static>, + ex: Executor, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), peer_pool, - codec, + conn, protocol_ids: RwLock::new(Vec::new()), remote_endpoint, conn_direction, protocol_events: EventSys::new(), task_group: TaskGroup::with_executor(ex), - stop_chan: channel::bounded(1), + stop_chan: async_channel::bounded(1), }) } @@ -88,7 +86,7 @@ impl Peer { } /// Send a message to the peer connection using the specified protocol. - pub async fn send<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { + pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { let payload = encode(msg)?; let proto_msg = ProtocolMsg { @@ -96,12 +94,14 @@ impl Peer { payload: payload.to_vec(), }; - self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?; + self.conn + .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?) + .await?; Ok(()) } /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) { + pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) { self.peer_pool().broadcast(protocol_id, msg).await; } @@ -123,7 +123,9 @@ impl Peer { let _ = self.stop_chan.0.try_send(Ok(())); // No need to handle the error here - let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await; + let shutdown_msg = + NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0)).expect("pack shutdown message"); + let _ = self.conn.send(shutdown_msg).await; // Force shutting down self.task_group.cancel().await; @@ -170,7 +172,7 @@ impl Peer { /// Start a read loop to handle incoming messages from the peer connection. async fn read_loop(&self) -> Result<()> { loop { - let fut = select(self.stop_chan.1.recv(), self.codec.read()).await; + let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await; let result = match fut { Either::Left(stop_signal) => { trace!("Peer {} received a stop signal", self.id); |