aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/peer
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src/peer')
-rw-r--r--p2p/src/peer/mod.rs23
-rw-r--r--p2p/src/peer/peer_id.rs17
2 files changed, 27 insertions, 13 deletions
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 85cd558..6ed0dd8 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -11,17 +11,17 @@ use smol::{
};
use karyons_core::{
- async_utils::{select, Either, TaskGroup, TaskResult},
+ async_util::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
- utils::{decode, encode},
+ util::{decode, encode},
GlobalExecutor,
};
use karyons_net::Endpoint;
use crate::{
+ codec::{Codec, CodecMsg},
connection::ConnDirection,
- io_codec::{CodecMsg, IOCodec},
message::{NetMsgCmd, ProtocolMsg, ShutdownMsg},
peer_pool::{ArcPeerPool, WeakPeerPool},
protocol::{Protocol, ProtocolEvent, ProtocolID},
@@ -37,8 +37,8 @@ pub struct Peer {
/// A weak pointer to `PeerPool`
peer_pool: WeakPeerPool,
- /// Holds the IOCodec for the peer connection
- io_codec: IOCodec,
+ /// Holds the Codec for the peer connection
+ codec: Codec,
/// Remote endpoint for the peer
remote_endpoint: Endpoint,
@@ -64,7 +64,7 @@ impl Peer {
pub fn new(
peer_pool: WeakPeerPool,
id: &PeerID,
- io_codec: IOCodec,
+ codec: Codec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
ex: GlobalExecutor,
@@ -72,7 +72,7 @@ impl Peer {
Arc::new(Peer {
id: id.clone(),
peer_pool,
- io_codec,
+ codec,
protocol_ids: RwLock::new(Vec::new()),
remote_endpoint,
conn_direction,
@@ -97,7 +97,7 @@ impl Peer {
payload: payload.to_vec(),
};
- self.io_codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
+ self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
Ok(())
}
@@ -124,10 +124,7 @@ impl Peer {
let _ = self.stop_chan.0.try_send(Ok(()));
// No need to handle the error here
- let _ = self
- .io_codec
- .write(NetMsgCmd::Shutdown, &ShutdownMsg(0))
- .await;
+ let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await;
// Force shutting down
self.task_group.cancel().await;
@@ -174,7 +171,7 @@ impl Peer {
/// Start a read loop to handle incoming messages from the peer connection.
async fn read_loop(&self) -> Result<()> {
loop {
- let fut = select(self.stop_chan.1.recv(), self.io_codec.read()).await;
+ let fut = select(self.stop_chan.1.recv(), self.codec.read()).await;
let result = match fut {
Either::Left(stop_signal) => {
trace!("Peer {} received a stop signal", self.id);
diff --git a/p2p/src/peer/peer_id.rs b/p2p/src/peer/peer_id.rs
index c8aec7d..903d827 100644
--- a/p2p/src/peer/peer_id.rs
+++ b/p2p/src/peer/peer_id.rs
@@ -2,6 +2,10 @@ use bincode::{Decode, Encode};
use rand::{rngs::OsRng, RngCore};
use sha2::{Digest, Sha256};
+use karyons_core::key_pair::PublicKey;
+
+use crate::Error;
+
/// Represents a unique identifier for a peer.
#[derive(Clone, Debug, Eq, PartialEq, Hash, Decode, Encode)]
pub struct PeerID(pub [u8; 32]);
@@ -39,3 +43,16 @@ impl From<[u8; 32]> for PeerID {
PeerID(b)
}
}
+
+impl TryFrom<PublicKey> for PeerID {
+ type Error = Error;
+
+ fn try_from(pk: PublicKey) -> Result<Self, Self::Error> {
+ let pk: [u8; 32] = pk
+ .as_bytes()
+ .try_into()
+ .map_err(|_| Error::TryFromPublicKey("Failed to convert public key to [u8;32]"))?;
+
+ Ok(PeerID(pk))
+ }
+}