diff options
Diffstat (limited to 'p2p/src/discovery/lookup.rs')
-rw-r--r-- | p2p/src/discovery/lookup.rs | 71 |
1 files changed, 38 insertions, 33 deletions
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index c81fbc6..cff4610 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -3,10 +3,13 @@ use std::{sync::Arc, time::Duration}; use futures_util::{stream::FuturesUnordered, StreamExt}; use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; -use smol::lock::{Mutex, RwLock}; use karyon_core::{ - async_util::{timeout, Executor}, + async_runtime::{ + lock::{Mutex, RwLock}, + Executor, + }, + async_util::timeout, crypto::KeyPair, util::decode, }; @@ -14,7 +17,6 @@ use karyon_core::{ use karyon_net::{Conn, Endpoint}; use crate::{ - codec::Codec, connector::Connector, listener::Listener, message::{ @@ -64,7 +66,7 @@ impl LookupService { table: Arc<Mutex<RoutingTable>>, config: Arc<Config>, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> Self { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); @@ -228,8 +230,7 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result<Vec<PeerMsg>> { let conn = self.connector.connect(&endpoint, &peer_id).await?; - let io_codec = Codec::new(conn); - let result = self.handle_outbound(io_codec, target_peer_id).await; + let result = self.handle_outbound(conn, target_peer_id).await; self.monitor .notify(&ConnEvent::Disconnected(endpoint).into()) @@ -242,14 +243,14 @@ impl LookupService { /// Handles outbound connection async fn handle_outbound( &self, - io_codec: Codec, + conn: Conn<NetMsg>, target_peer_id: &PeerID, ) -> Result<Vec<PeerMsg>> { trace!("Send Ping msg"); - self.send_ping_msg(&io_codec).await?; + self.send_ping_msg(&conn).await?; trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?; + let peers = self.send_findpeer_msg(&conn, target_peer_id).await?; if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { return Err(Error::Lookup("Received too many peers in PeersMsg")); @@ -257,12 +258,12 @@ impl LookupService { trace!("Send Peer msg"); if let Some(endpoint) = &self.listen_endpoint { - self.send_peer_msg(&io_codec, endpoint.read().await.clone()) + self.send_peer_msg(&conn, endpoint.read().await.clone()) .await?; } trace!("Send Shutdown msg"); - self.send_shutdown_msg(&io_codec).await?; + self.send_shutdown_msg(&conn).await?; Ok(peers.0) } @@ -277,7 +278,7 @@ impl LookupService { let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); let selfc = self.clone(); - let callback = |conn: Conn| async move { + let callback = |conn: Conn<NetMsg>| async move { let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); timeout(t, selfc.handle_inbound(conn)).await??; Ok(()) @@ -288,10 +289,9 @@ impl LookupService { } /// Handles inbound connection - async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> { - let io_codec = Codec::new(conn); + async fn handle_inbound(self: &Arc<Self>, conn: Conn<NetMsg>) -> Result<()> { loop { - let msg: NetMsg = io_codec.read().await?; + let msg: NetMsg = conn.recv().await?; trace!("Receive msg {:?}", msg.header.command); if let NetMsgCmd::Shutdown = msg.header.command { @@ -304,12 +304,12 @@ impl LookupService { if !version_match(&self.config.version.req, &ping_msg.version) { return Err(Error::IncompatibleVersion("system: {}".into())); } - self.send_pong_msg(ping_msg.nonce, &io_codec).await?; + self.send_pong_msg(ping_msg.nonce, &conn).await?; } NetMsgCmd::FindPeer => { let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?; let peer_id = findpeer_msg.0; - self.send_peers_msg(&peer_id, &io_codec).await?; + self.send_peers_msg(&peer_id, &conn).await?; } NetMsgCmd::Peer => { let (peer, _) = decode::<PeerMsg>(&msg.payload)?; @@ -322,7 +322,7 @@ impl LookupService { } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> { + async fn send_ping_msg(&self, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Pong msg"); let mut nonce: [u8; 32] = [0; 32]; @@ -332,10 +332,10 @@ impl LookupService { version: self.config.version.v.clone(), nonce, }; - io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; + conn.send(NetMsg::new(NetMsgCmd::Ping, &ping_msg)?).await?; let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + let recv_msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Pong, recv_msg); let (pong_msg, _) = decode::<PongMsg>(&payload)?; @@ -348,21 +348,24 @@ impl LookupService { } /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> { + async fn send_pong_msg(&self, nonce: [u8; 32], conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Pong msg"); - io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; + conn.send(NetMsg::new(NetMsgCmd::Pong, &PongMsg(nonce))?) + .await?; Ok(()) } /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> { + async fn send_findpeer_msg(&self, conn: &Conn<NetMsg>, peer_id: &PeerID) -> Result<PeersMsg> { trace!("Send FindPeer msg"); - io_codec - .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) - .await?; + conn.send(NetMsg::new( + NetMsgCmd::FindPeer, + &FindPeerMsg(peer_id.clone()), + )?) + .await?; let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + let recv_msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Peers, recv_msg); let (peers, _) = decode(&payload)?; @@ -371,19 +374,20 @@ impl LookupService { } /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> { + async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Peers msg"); let table = self.table.lock().await; let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); drop(table); let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect(); - io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; + conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) + .await?; Ok(()) } /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> { + async fn send_peer_msg(&self, conn: &Conn<NetMsg>, endpoint: Endpoint) -> Result<()> { trace!("Send Peer msg"); let peer_msg = PeerMsg { addr: endpoint.addr()?.clone(), @@ -391,14 +395,15 @@ impl LookupService { discovery_port: self.config.discovery_port, peer_id: self.id.clone(), }; - io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; + conn.send(NetMsg::new(NetMsgCmd::Peer, &peer_msg)?).await?; Ok(()) } /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> { + async fn send_shutdown_msg(&self, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Shutdown msg"); - io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; + conn.send(NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0))?) + .await?; Ok(()) } } |