aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/connection.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
commite15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch)
tree7976f6993e4f6b3646f5bd6954189346d5ffd330 /p2p/src/connection.rs
parent6c65232d741229635151671708556b9af7ef75ac (diff)
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'p2p/src/connection.rs')
-rw-r--r--p2p/src/connection.rs110
1 files changed, 110 insertions, 0 deletions
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
new file mode 100644
index 0000000..52190a8
--- /dev/null
+++ b/p2p/src/connection.rs
@@ -0,0 +1,110 @@
+use std::{collections::HashMap, fmt, sync::Arc};
+
+use async_channel::Sender;
+use bincode::Encode;
+
+use karyon_core::{
+ event::{EventListener, EventSys},
+ util::encode,
+};
+
+use karyon_net::{Conn, Endpoint};
+
+use crate::{
+ message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
+ protocol::{Protocol, ProtocolEvent, ProtocolID},
+ Error, Result,
+};
+
+/// Defines the direction of a network connection.
+#[derive(Clone, Debug)]
+pub enum ConnDirection {
+ Inbound,
+ Outbound,
+}
+
+impl fmt::Display for ConnDirection {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ ConnDirection::Inbound => write!(f, "Inbound"),
+ ConnDirection::Outbound => write!(f, "Outbound"),
+ }
+ }
+}
+
+pub struct Connection {
+ pub(crate) direction: ConnDirection,
+ conn: Conn<NetMsg>,
+ disconnect_signal: Sender<Result<()>>,
+ /// `EventSys` responsible for sending events to the registered protocols.
+ protocol_events: Arc<EventSys<ProtocolID>>,
+ pub(crate) remote_endpoint: Endpoint,
+ listeners: HashMap<ProtocolID, EventListener<ProtocolID, ProtocolEvent>>,
+}
+
+impl Connection {
+ pub fn new(
+ conn: Conn<NetMsg>,
+ signal: Sender<Result<()>>,
+ direction: ConnDirection,
+ remote_endpoint: Endpoint,
+ ) -> Self {
+ Self {
+ conn,
+ direction,
+ protocol_events: EventSys::new(),
+ disconnect_signal: signal,
+ remote_endpoint,
+ listeners: HashMap::new(),
+ }
+ }
+
+ pub async fn send<T: Encode>(&self, protocol_id: ProtocolID, msg: T) -> Result<()> {
+ let payload = encode(&msg)?;
+
+ let proto_msg = ProtocolMsg {
+ protocol_id,
+ payload: payload.to_vec(),
+ };
+
+ let msg = NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?;
+ self.conn.send(msg).await.map_err(Error::from)
+ }
+
+ pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
+ match self.listeners.get(&P::id()) {
+ Some(l) => l.recv().await.map_err(Error::from),
+ // TODO
+ None => todo!(),
+ }
+ }
+
+ /// Registers a listener for the given Protocol `P`.
+ pub async fn register_protocol(&mut self, protocol_id: String) {
+ let listener = self.protocol_events.register(&protocol_id).await;
+ self.listeners.insert(protocol_id, listener);
+ }
+
+ pub async fn emit_msg(&self, id: &ProtocolID, event: &ProtocolEvent) -> Result<()> {
+ self.protocol_events.emit_by_topic(id, event).await?;
+ Ok(())
+ }
+
+ pub async fn recv_inner(&self) -> Result<NetMsg> {
+ self.conn.recv().await.map_err(Error::from)
+ }
+
+ pub async fn send_inner(&self, msg: NetMsg) -> Result<()> {
+ self.conn.send(msg).await.map_err(Error::from)
+ }
+
+ pub async fn disconnect(&self, res: Result<()>) -> Result<()> {
+ self.protocol_events.clear().await;
+ self.disconnect_signal.send(res).await?;
+
+ let m = NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0)).expect("Create shutdown message");
+ self.conn.send(m).await.map_err(Error::from)?;
+
+ Ok(())
+ }
+}