From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@karyontech.net>
Date: Thu, 11 Apr 2024 10:19:20 +0200
Subject: add support for tokio & improve net crate api

---
 p2p/src/peer/mod.rs | 42 ++++++++++++++++++++++--------------------
 1 file changed, 22 insertions(+), 20 deletions(-)

(limited to 'p2p/src/peer')

diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index ca68530..f0f6f17 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -4,24 +4,22 @@ pub use peer_id::PeerID;
 
 use std::sync::Arc;
 
+use async_channel::{Receiver, Sender};
+use bincode::{Decode, Encode};
 use log::{error, trace};
-use smol::{
-    channel::{self, Receiver, Sender},
-    lock::RwLock,
-};
 
 use karyon_core::{
-    async_util::{select, Either, Executor, TaskGroup, TaskResult},
+    async_runtime::{lock::RwLock, Executor},
+    async_util::{select, Either, TaskGroup, TaskResult},
     event::{ArcEventSys, EventListener, EventSys},
     util::{decode, encode},
 };
 
-use karyon_net::Endpoint;
+use karyon_net::{Conn, Endpoint};
 
 use crate::{
-    codec::{Codec, CodecMsg},
     connection::ConnDirection,
-    message::{NetMsgCmd, ProtocolMsg, ShutdownMsg},
+    message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
     peer_pool::{ArcPeerPool, WeakPeerPool},
     protocol::{Protocol, ProtocolEvent, ProtocolID},
     Config, Error, Result,
@@ -36,8 +34,8 @@ pub struct Peer {
     /// A weak pointer to `PeerPool`
     peer_pool: WeakPeerPool,
 
-    /// Holds the Codec for the peer connection
-    codec: Codec,
+    /// Holds the peer connection
+    conn: Conn<NetMsg>,
 
     /// Remote endpoint for the peer
     remote_endpoint: Endpoint,
@@ -55,7 +53,7 @@ pub struct Peer {
     stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
 
     /// Managing spawned tasks.
-    task_group: TaskGroup<'static>,
+    task_group: TaskGroup,
 }
 
 impl Peer {
@@ -63,21 +61,21 @@ impl Peer {
     pub fn new(
         peer_pool: WeakPeerPool,
         id: &PeerID,
-        codec: Codec,
+        conn: Conn<NetMsg>,
         remote_endpoint: Endpoint,
         conn_direction: ConnDirection,
-        ex: Executor<'static>,
+        ex: Executor,
     ) -> ArcPeer {
         Arc::new(Peer {
             id: id.clone(),
             peer_pool,
-            codec,
+            conn,
             protocol_ids: RwLock::new(Vec::new()),
             remote_endpoint,
             conn_direction,
             protocol_events: EventSys::new(),
             task_group: TaskGroup::with_executor(ex),
-            stop_chan: channel::bounded(1),
+            stop_chan: async_channel::bounded(1),
         })
     }
 
@@ -88,7 +86,7 @@ impl Peer {
     }
 
     /// Send a message to the peer connection using the specified protocol.
-    pub async fn send<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {
+    pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> {
         let payload = encode(msg)?;
 
         let proto_msg = ProtocolMsg {
@@ -96,12 +94,14 @@ impl Peer {
             payload: payload.to_vec(),
         };
 
-        self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
+        self.conn
+            .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?)
+            .await?;
         Ok(())
     }
 
     /// Broadcast a message to all connected peers using the specified protocol.
-    pub async fn broadcast<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) {
+    pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) {
         self.peer_pool().broadcast(protocol_id, msg).await;
     }
 
@@ -123,7 +123,9 @@ impl Peer {
         let _ = self.stop_chan.0.try_send(Ok(()));
 
         // No need to handle the error here
-        let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await;
+        let shutdown_msg =
+            NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0)).expect("pack shutdown message");
+        let _ = self.conn.send(shutdown_msg).await;
 
         // Force shutting down
         self.task_group.cancel().await;
@@ -170,7 +172,7 @@ impl Peer {
     /// Start a read loop to handle incoming messages from the peer connection.
     async fn read_loop(&self) -> Result<()> {
         loop {
-            let fut = select(self.stop_chan.1.recv(), self.codec.read()).await;
+            let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await;
             let result = match fut {
                 Either::Left(stop_signal) => {
                     trace!("Peer {} received a stop signal", self.id);
-- 
cgit v1.2.3