diff options
Diffstat (limited to 'p2p/src/peer')
| -rw-r--r-- | p2p/src/peer/mod.rs | 42 | 
1 files changed, 22 insertions, 20 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index ca68530..f0f6f17 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -4,24 +4,22 @@ pub use peer_id::PeerID;  use std::sync::Arc; +use async_channel::{Receiver, Sender}; +use bincode::{Decode, Encode};  use log::{error, trace}; -use smol::{ -    channel::{self, Receiver, Sender}, -    lock::RwLock, -};  use karyon_core::{ -    async_util::{select, Either, Executor, TaskGroup, TaskResult}, +    async_runtime::{lock::RwLock, Executor}, +    async_util::{select, Either, TaskGroup, TaskResult},      event::{ArcEventSys, EventListener, EventSys},      util::{decode, encode},  }; -use karyon_net::Endpoint; +use karyon_net::{Conn, Endpoint};  use crate::{ -    codec::{Codec, CodecMsg},      connection::ConnDirection, -    message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, +    message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},      peer_pool::{ArcPeerPool, WeakPeerPool},      protocol::{Protocol, ProtocolEvent, ProtocolID},      Config, Error, Result, @@ -36,8 +34,8 @@ pub struct Peer {      /// A weak pointer to `PeerPool`      peer_pool: WeakPeerPool, -    /// Holds the Codec for the peer connection -    codec: Codec, +    /// Holds the peer connection +    conn: Conn<NetMsg>,      /// Remote endpoint for the peer      remote_endpoint: Endpoint, @@ -55,7 +53,7 @@ pub struct Peer {      stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),      /// Managing spawned tasks. -    task_group: TaskGroup<'static>, +    task_group: TaskGroup,  }  impl Peer { @@ -63,21 +61,21 @@ impl Peer {      pub fn new(          peer_pool: WeakPeerPool,          id: &PeerID, -        codec: Codec, +        conn: Conn<NetMsg>,          remote_endpoint: Endpoint,          conn_direction: ConnDirection, -        ex: Executor<'static>, +        ex: Executor,      ) -> ArcPeer {          Arc::new(Peer {              id: id.clone(),              peer_pool, -            codec, +            conn,              protocol_ids: RwLock::new(Vec::new()),              remote_endpoint,              conn_direction,              protocol_events: EventSys::new(),              task_group: TaskGroup::with_executor(ex), -            stop_chan: channel::bounded(1), +            stop_chan: async_channel::bounded(1),          })      } @@ -88,7 +86,7 @@ impl Peer {      }      /// Send a message to the peer connection using the specified protocol. -    pub async fn send<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { +    pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {          let payload = encode(msg)?;          let proto_msg = ProtocolMsg { @@ -96,12 +94,14 @@ impl Peer {              payload: payload.to_vec(),          }; -        self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?; +        self.conn +            .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?) +            .await?;          Ok(())      }      /// Broadcast a message to all connected peers using the specified protocol. -    pub async fn broadcast<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) { +    pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) {          self.peer_pool().broadcast(protocol_id, msg).await;      } @@ -123,7 +123,9 @@ impl Peer {          let _ = self.stop_chan.0.try_send(Ok(()));          // No need to handle the error here -        let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await; +        let shutdown_msg = +            NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0)).expect("pack shutdown message"); +        let _ = self.conn.send(shutdown_msg).await;          // Force shutting down          self.task_group.cancel().await; @@ -170,7 +172,7 @@ impl Peer {      /// Start a read loop to handle incoming messages from the peer connection.      async fn read_loop(&self) -> Result<()> {          loop { -            let fut = select(self.stop_chan.1.recv(), self.codec.read()).await; +            let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await;              let result = match fut {                  Either::Left(stop_signal) => {                      trace!("Peer {} received a stop signal", self.id);  | 
