From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- p2p/src/connection.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 p2p/src/connection.rs (limited to 'p2p/src/connection.rs') diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs new file mode 100644 index 0000000..52190a8 --- /dev/null +++ b/p2p/src/connection.rs @@ -0,0 +1,110 @@ +use std::{collections::HashMap, fmt, sync::Arc}; + +use async_channel::Sender; +use bincode::Encode; + +use karyon_core::{ + event::{EventListener, EventSys}, + util::encode, +}; + +use karyon_net::{Conn, Endpoint}; + +use crate::{ + message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg}, + protocol::{Protocol, ProtocolEvent, ProtocolID}, + Error, Result, +}; + +/// Defines the direction of a network connection. +#[derive(Clone, Debug)] +pub enum ConnDirection { + Inbound, + Outbound, +} + +impl fmt::Display for ConnDirection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ConnDirection::Inbound => write!(f, "Inbound"), + ConnDirection::Outbound => write!(f, "Outbound"), + } + } +} + +pub struct Connection { + pub(crate) direction: ConnDirection, + conn: Conn, + disconnect_signal: Sender>, + /// `EventSys` responsible for sending events to the registered protocols. + protocol_events: Arc>, + pub(crate) remote_endpoint: Endpoint, + listeners: HashMap>, +} + +impl Connection { + pub fn new( + conn: Conn, + signal: Sender>, + direction: ConnDirection, + remote_endpoint: Endpoint, + ) -> Self { + Self { + conn, + direction, + protocol_events: EventSys::new(), + disconnect_signal: signal, + remote_endpoint, + listeners: HashMap::new(), + } + } + + pub async fn send(&self, protocol_id: ProtocolID, msg: T) -> Result<()> { + let payload = encode(&msg)?; + + let proto_msg = ProtocolMsg { + protocol_id, + payload: payload.to_vec(), + }; + + let msg = NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?; + self.conn.send(msg).await.map_err(Error::from) + } + + pub async fn recv(&self) -> Result { + match self.listeners.get(&P::id()) { + Some(l) => l.recv().await.map_err(Error::from), + // TODO + None => todo!(), + } + } + + /// Registers a listener for the given Protocol `P`. + pub async fn register_protocol(&mut self, protocol_id: String) { + let listener = self.protocol_events.register(&protocol_id).await; + self.listeners.insert(protocol_id, listener); + } + + pub async fn emit_msg(&self, id: &ProtocolID, event: &ProtocolEvent) -> Result<()> { + self.protocol_events.emit_by_topic(id, event).await?; + Ok(()) + } + + pub async fn recv_inner(&self) -> Result { + self.conn.recv().await.map_err(Error::from) + } + + pub async fn send_inner(&self, msg: NetMsg) -> Result<()> { + self.conn.send(msg).await.map_err(Error::from) + } + + pub async fn disconnect(&self, res: Result<()>) -> Result<()> { + self.protocol_events.clear().await; + self.disconnect_signal.send(res).await?; + + let m = NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0)).expect("Create shutdown message"); + self.conn.send(m).await.map_err(Error::from)?; + + Ok(()) + } +} -- cgit v1.2.3