aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md6
-rw-r--r--core/Cargo.toml6
-rw-r--r--core/src/async_util/backoff.rs (renamed from core/src/async_utils/backoff.rs)2
-rw-r--r--core/src/async_util/condvar.rs (renamed from core/src/async_utils/condvar.rs)4
-rw-r--r--core/src/async_util/condwait.rs (renamed from core/src/async_utils/condwait.rs)2
-rw-r--r--core/src/async_util/mod.rs (renamed from core/src/async_utils/mod.rs)0
-rw-r--r--core/src/async_util/select.rs (renamed from core/src/async_utils/select.rs)2
-rw-r--r--core/src/async_util/task_group.rs (renamed from core/src/async_utils/task_group.rs)2
-rw-r--r--core/src/async_util/timeout.rs (renamed from core/src/async_utils/timeout.rs)2
-rw-r--r--core/src/error.rs6
-rw-r--r--core/src/event.rs2
-rw-r--r--core/src/key_pair.rs189
-rw-r--r--core/src/lib.rs9
-rw-r--r--core/src/pubsub.rs2
-rw-r--r--core/src/util/decode.rs (renamed from core/src/utils/decode.rs)0
-rw-r--r--core/src/util/encode.rs (renamed from core/src/utils/encode.rs)0
-rw-r--r--core/src/util/mod.rs (renamed from core/src/utils/mod.rs)0
-rw-r--r--core/src/util/path.rs (renamed from core/src/utils/path.rs)0
-rw-r--r--jsonrpc/src/client.rs2
-rw-r--r--jsonrpc/src/codec.rs2
-rw-r--r--jsonrpc/src/server.rs2
-rw-r--r--net/Cargo.toml3
-rw-r--r--net/src/connection.rs9
-rw-r--r--net/src/endpoint.rs70
-rw-r--r--net/src/error.rs8
-rw-r--r--net/src/lib.rs1
-rw-r--r--net/src/listener.rs5
-rw-r--r--net/src/transports/mod.rs1
-rw-r--r--net/src/transports/tcp.rs2
-rw-r--r--net/src/transports/tls.rs140
-rw-r--r--net/src/transports/udp.rs2
-rw-r--r--net/src/transports/unix.rs2
-rw-r--r--p2p/Cargo.toml6
-rw-r--r--p2p/README.md11
-rw-r--r--p2p/examples/chat.rs7
-rw-r--r--p2p/examples/monitor.rs14
-rwxr-xr-xp2p/examples/net_simulation.sh24
-rw-r--r--p2p/examples/peer.rs14
-rw-r--r--p2p/src/backend.rs43
-rw-r--r--p2p/src/codec.rs (renamed from p2p/src/io_codec.rs)24
-rw-r--r--p2p/src/config.rs7
-rw-r--r--p2p/src/connection.rs2
-rw-r--r--p2p/src/connector.rs56
-rw-r--r--p2p/src/discovery/lookup.rs71
-rw-r--r--p2p/src/discovery/mod.rs30
-rw-r--r--p2p/src/discovery/refresh.rs4
-rw-r--r--p2p/src/error.rs18
-rw-r--r--p2p/src/lib.rs13
-rw-r--r--p2p/src/listener.rs65
-rw-r--r--p2p/src/message.rs2
-rw-r--r--p2p/src/monitor.rs4
-rw-r--r--p2p/src/peer/mod.rs23
-rw-r--r--p2p/src/peer/peer_id.rs17
-rw-r--r--p2p/src/peer_pool.rs46
-rw-r--r--p2p/src/protocol.rs7
-rw-r--r--p2p/src/protocols/ping.rs6
-rw-r--r--p2p/src/routing_table/entry.rs2
-rw-r--r--p2p/src/routing_table/mod.rs19
-rw-r--r--p2p/src/slots.rs2
-rw-r--r--p2p/src/tls_config.rs214
-rw-r--r--p2p/src/utils/mod.rs21
-rw-r--r--p2p/src/version.rs (renamed from p2p/src/utils/version.rs)0
62 files changed, 999 insertions, 256 deletions
diff --git a/README.md b/README.md
index 3343f85..68ef672 100644
--- a/README.md
+++ b/README.md
@@ -25,9 +25,9 @@ implementation for building collaborative software.
## Status
This project is a work in progress. The current focus is on shipping karyons
-crdt and karyons store, along with major changes to the network stack,
-including TLS implementation. You can check the
-[issues](https://github.com/karyons/karyons/issues) for updates on ongoing tasks.
+crdt and karyons store, along with major changes to the network stack. You can
+check the [issues](https://github.com/karyons/karyons/issues) for updates on
+ongoing tasks.
## Docs
diff --git a/core/Cargo.toml b/core/Cargo.toml
index ab05288..5a99e2d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -10,9 +10,13 @@ edition.workspace = true
smol = "1.3.0"
pin-project-lite = "0.2.13"
log = "0.4.20"
-bincode = { version="2.0.0-rc.3", features = ["derive"]}
+bincode = "2.0.0-rc.3"
chrono = "0.4.30"
rand = "0.8.5"
thiserror = "1.0.47"
dirs = "5.0.1"
async-task = "4.5.0"
+ed25519-dalek = { version = "2.1.0", features = ["rand_core"]}
+
+
+
diff --git a/core/src/async_utils/backoff.rs b/core/src/async_util/backoff.rs
index f7e131d..a231229 100644
--- a/core/src/async_utils/backoff.rs
+++ b/core/src/async_util/backoff.rs
@@ -12,7 +12,7 @@ use smol::Timer;
/// # Examples
///
/// ```
-/// use karyons_core::async_utils::Backoff;
+/// use karyons_core::async_util::Backoff;
///
/// async {
/// let backoff = Backoff::new(300, 3000);
diff --git a/core/src/async_utils/condvar.rs b/core/src/async_util/condvar.rs
index 814f78f..7396d0d 100644
--- a/core/src/async_utils/condvar.rs
+++ b/core/src/async_util/condvar.rs
@@ -8,7 +8,7 @@ use std::{
use smol::lock::MutexGuard;
-use crate::utils::random_16;
+use crate::util::random_16;
/// CondVar is an async version of <https://doc.rust-lang.org/std/sync/struct.Condvar.html>
///
@@ -19,7 +19,7 @@ use crate::utils::random_16;
///
/// use smol::lock::Mutex;
///
-/// use karyons_core::async_utils::CondVar;
+/// use karyons_core::async_util::CondVar;
///
/// async {
///
diff --git a/core/src/async_utils/condwait.rs b/core/src/async_util/condwait.rs
index e31fac3..cd4b269 100644
--- a/core/src/async_utils/condwait.rs
+++ b/core/src/async_util/condwait.rs
@@ -9,7 +9,7 @@ use super::CondVar;
///```
/// use std::sync::Arc;
///
-/// use karyons_core::async_utils::CondWait;
+/// use karyons_core::async_util::CondWait;
///
/// async {
/// let cond_wait = Arc::new(CondWait::new());
diff --git a/core/src/async_utils/mod.rs b/core/src/async_util/mod.rs
index c871bad..c871bad 100644
--- a/core/src/async_utils/mod.rs
+++ b/core/src/async_util/mod.rs
diff --git a/core/src/async_utils/select.rs b/core/src/async_util/select.rs
index 9fe3c77..8f2f7f6 100644
--- a/core/src/async_utils/select.rs
+++ b/core/src/async_util/select.rs
@@ -12,7 +12,7 @@ use smol::future::Future;
/// ```
/// use std::future;
///
-/// use karyons_core::async_utils::{select, Either};
+/// use karyons_core::async_util::{select, Either};
///
/// async {
/// let fut1 = future::pending::<String>();
diff --git a/core/src/async_utils/task_group.rs b/core/src/async_util/task_group.rs
index afc9648..3fc0cb7 100644
--- a/core/src/async_utils/task_group.rs
+++ b/core/src/async_util/task_group.rs
@@ -14,7 +14,7 @@ use super::{select, CondWait, Either};
///
/// use std::sync::Arc;
///
-/// use karyons_core::async_utils::TaskGroup;
+/// use karyons_core::async_util::TaskGroup;
///
/// async {
///
diff --git a/core/src/async_utils/timeout.rs b/core/src/async_util/timeout.rs
index 7c55e1b..6ab35c4 100644
--- a/core/src/async_utils/timeout.rs
+++ b/core/src/async_util/timeout.rs
@@ -13,7 +13,7 @@ use crate::{error::Error, Result};
/// ```
/// use std::{future, time::Duration};
///
-/// use karyons_core::async_utils::timeout;
+/// use karyons_core::async_util::timeout;
///
/// async {
/// let fut = future::pending::<()>();
diff --git a/core/src/error.rs b/core/src/error.rs
index 63b45d3..7c547c4 100644
--- a/core/src/error.rs
+++ b/core/src/error.rs
@@ -7,12 +7,18 @@ pub enum Error {
#[error(transparent)]
IO(#[from] std::io::Error),
+ #[error("TryInto Error: {0}")]
+ TryInto(&'static str),
+
#[error("Timeout Error")]
Timeout,
#[error("Path Not Found Error: {0}")]
PathNotFound(&'static str),
+ #[error(transparent)]
+ Ed25519(#[from] ed25519_dalek::ed25519::Error),
+
#[error("Channel Send Error: {0}")]
ChannelSend(String),
diff --git a/core/src/event.rs b/core/src/event.rs
index 0503e88..f2c5510 100644
--- a/core/src/event.rs
+++ b/core/src/event.rs
@@ -12,7 +12,7 @@ use smol::{
lock::Mutex,
};
-use crate::{utils::random_16, Result};
+use crate::{util::random_16, Result};
pub type ArcEventSys<T> = Arc<EventSys<T>>;
pub type WeakEventSys<T> = Weak<EventSys<T>>;
diff --git a/core/src/key_pair.rs b/core/src/key_pair.rs
new file mode 100644
index 0000000..4016351
--- /dev/null
+++ b/core/src/key_pair.rs
@@ -0,0 +1,189 @@
+use ed25519_dalek::{Signer as _, Verifier as _};
+use rand::rngs::OsRng;
+
+use crate::{error::Error, Result};
+
+/// key cryptography type
+pub enum KeyPairType {
+ Ed25519,
+}
+
+/// A Public key
+pub struct PublicKey(PublicKeyInner);
+
+/// A Secret key
+pub struct SecretKey(Vec<u8>);
+
+impl PublicKey {
+ pub fn as_bytes(&self) -> &[u8] {
+ self.0.as_bytes()
+ }
+
+ /// Verify a signature on a message with this public key.
+ pub fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> {
+ self.0.verify(msg, signature)
+ }
+}
+
+impl PublicKey {
+ pub fn from_bytes(kp_type: &KeyPairType, pk: &[u8]) -> Result<Self> {
+ Ok(Self(PublicKeyInner::from_bytes(kp_type, pk)?))
+ }
+}
+
+/// A KeyPair.
+#[derive(Clone)]
+pub struct KeyPair(KeyPairInner);
+
+impl KeyPair {
+ /// Generate a new random keypair.
+ pub fn generate(kp_type: &KeyPairType) -> Self {
+ Self(KeyPairInner::generate(kp_type))
+ }
+
+ /// Sign a message using the private key.
+ pub fn sign(&self, msg: &[u8]) -> Vec<u8> {
+ self.0.sign(msg)
+ }
+
+ /// Get the public key of this keypair.
+ pub fn public(&self) -> PublicKey {
+ self.0.public()
+ }
+
+ /// Get the secret key of this keypair.
+ pub fn secret(&self) -> SecretKey {
+ self.0.secret()
+ }
+}
+
+/// An extension trait, adding essential methods to all [`KeyPair`] types.
+trait KeyPairExt {
+ /// Sign a message using the private key.
+ fn sign(&self, msg: &[u8]) -> Vec<u8>;
+
+ /// Get the public key of this keypair.
+ fn public(&self) -> PublicKey;
+
+ /// Get the secret key of this keypair.
+ fn secret(&self) -> SecretKey;
+}
+
+#[derive(Clone)]
+enum KeyPairInner {
+ Ed25519(Ed25519KeyPair),
+}
+
+impl KeyPairInner {
+ fn generate(kp_type: &KeyPairType) -> Self {
+ match kp_type {
+ KeyPairType::Ed25519 => Self::Ed25519(Ed25519KeyPair::generate()),
+ }
+ }
+}
+
+impl KeyPairExt for KeyPairInner {
+ fn sign(&self, msg: &[u8]) -> Vec<u8> {
+ match self {
+ KeyPairInner::Ed25519(kp) => kp.sign(msg),
+ }
+ }
+
+ fn public(&self) -> PublicKey {
+ match self {
+ KeyPairInner::Ed25519(kp) => kp.public(),
+ }
+ }
+
+ fn secret(&self) -> SecretKey {
+ match self {
+ KeyPairInner::Ed25519(kp) => kp.secret(),
+ }
+ }
+}
+
+#[derive(Clone)]
+struct Ed25519KeyPair(ed25519_dalek::SigningKey);
+
+impl Ed25519KeyPair {
+ fn generate() -> Self {
+ Self(ed25519_dalek::SigningKey::generate(&mut OsRng))
+ }
+}
+
+impl KeyPairExt for Ed25519KeyPair {
+ fn sign(&self, msg: &[u8]) -> Vec<u8> {
+ self.0.sign(msg).to_bytes().to_vec()
+ }
+
+ fn public(&self) -> PublicKey {
+ PublicKey(PublicKeyInner::Ed25519(Ed25519PublicKey(
+ self.0.verifying_key(),
+ )))
+ }
+
+ fn secret(&self) -> SecretKey {
+ SecretKey(self.0.to_bytes().to_vec())
+ }
+}
+
+/// An extension trait, adding essential methods to all [`PublicKey`] types.
+trait PublicKeyExt {
+ fn as_bytes(&self) -> &[u8];
+
+ /// Verify a signature on a message with this public key.
+ fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()>;
+}
+
+enum PublicKeyInner {
+ Ed25519(Ed25519PublicKey),
+}
+
+impl PublicKeyInner {
+ pub fn from_bytes(kp_type: &KeyPairType, pk: &[u8]) -> Result<Self> {
+ match kp_type {
+ KeyPairType::Ed25519 => Ok(Self::Ed25519(Ed25519PublicKey::from_bytes(pk)?)),
+ }
+ }
+}
+
+impl PublicKeyExt for PublicKeyInner {
+ fn as_bytes(&self) -> &[u8] {
+ match self {
+ Self::Ed25519(pk) => pk.as_bytes(),
+ }
+ }
+
+ fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> {
+ match self {
+ Self::Ed25519(pk) => pk.verify(msg, signature),
+ }
+ }
+}
+
+struct Ed25519PublicKey(ed25519_dalek::VerifyingKey);
+
+impl Ed25519PublicKey {
+ pub fn from_bytes(pk: &[u8]) -> Result<Self> {
+ let pk_bytes: [u8; 32] = pk
+ .try_into()
+ .map_err(|_| Error::TryInto("Failed to convert slice to [u8; 32]"))?;
+
+ Ok(Self(ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)?))
+ }
+}
+
+impl PublicKeyExt for Ed25519PublicKey {
+ fn as_bytes(&self) -> &[u8] {
+ self.0.as_bytes()
+ }
+
+ fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> {
+ let sig_bytes: [u8; 64] = signature
+ .try_into()
+ .map_err(|_| Error::TryInto("Failed to convert slice to [u8; 64]"))?;
+ self.0
+ .verify(msg, &ed25519_dalek::Signature::from_bytes(&sig_bytes))?;
+ Ok(())
+ }
+}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 67e6610..276ed89 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -1,19 +1,22 @@
/// A set of helper tools and functions.
-pub mod utils;
+pub mod util;
/// A module containing async utilities that work with the
/// [`smol`](https://github.com/smol-rs/smol) async runtime.
-pub mod async_utils;
+pub mod async_util;
/// Represents karyons's Core Error.
pub mod error;
-/// [`event::EventSys`] Implementation
+/// [`event::EventSys`] implementation.
pub mod event;
/// A simple publish-subscribe system [`Read More`](./pubsub/struct.Publisher.html)
pub mod pubsub;
+/// A cryptographic key pair
+pub mod key_pair;
+
use smol::Executor as SmolEx;
use std::sync::Arc;
diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs
index 4cc0ab7..306d42f 100644
--- a/core/src/pubsub.rs
+++ b/core/src/pubsub.rs
@@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use log::error;
use smol::lock::Mutex;
-use crate::{utils::random_16, Result};
+use crate::{util::random_16, Result};
pub type ArcPublisher<T> = Arc<Publisher<T>>;
pub type SubscriptionID = u16;
diff --git a/core/src/utils/decode.rs b/core/src/util/decode.rs
index a8a6522..a8a6522 100644
--- a/core/src/utils/decode.rs
+++ b/core/src/util/decode.rs
diff --git a/core/src/utils/encode.rs b/core/src/util/encode.rs
index 7d1061b..7d1061b 100644
--- a/core/src/utils/encode.rs
+++ b/core/src/util/encode.rs
diff --git a/core/src/utils/mod.rs b/core/src/util/mod.rs
index a3c3f50..a3c3f50 100644
--- a/core/src/utils/mod.rs
+++ b/core/src/util/mod.rs
diff --git a/core/src/utils/path.rs b/core/src/util/path.rs
index 2cd900a..2cd900a 100644
--- a/core/src/utils/path.rs
+++ b/core/src/util/path.rs
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
index f5277aa..939d177 100644
--- a/jsonrpc/src/client.rs
+++ b/jsonrpc/src/client.rs
@@ -1,7 +1,7 @@
use log::debug;
use serde::{de::DeserializeOwned, Serialize};
-use karyons_core::utils::random_32;
+use karyons_core::util::random_32;
use karyons_net::{dial, Conn, Endpoint};
use crate::{
diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs
index e198a6e..5dac8da 100644
--- a/jsonrpc/src/codec.rs
+++ b/jsonrpc/src/codec.rs
@@ -1,6 +1,6 @@
use memchr::memchr;
-use karyons_core::async_utils::timeout;
+use karyons_core::async_util::timeout;
use karyons_net::Conn;
use crate::{Error, Result};
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs
index 5b9b799..05ef7da 100644
--- a/jsonrpc/src/server.rs
+++ b/jsonrpc/src/server.rs
@@ -4,7 +4,7 @@ use log::{debug, error, warn};
use smol::lock::RwLock;
use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
+ async_util::{TaskGroup, TaskResult},
Executor,
};
use karyons_net::{listen, Conn, Endpoint, Listener};
diff --git a/net/Cargo.toml b/net/Cargo.toml
index de9b33b..863a250 100644
--- a/net/Cargo.toml
+++ b/net/Cargo.toml
@@ -11,6 +11,7 @@ karyons_core.workspace = true
smol = "1.3.0"
async-trait = "0.1.73"
log = "0.4.20"
-bincode = { version="2.0.0-rc.3", features = ["derive"]}
+bincode = "2.0.0-rc.3"
thiserror = "1.0.47"
url = "2.4.1"
+async-rustls = { version = "0.4.1", features = ["dangerous_configuration"] }
diff --git a/net/src/connection.rs b/net/src/connection.rs
index d8ec0a3..b1d7550 100644
--- a/net/src/connection.rs
+++ b/net/src/connection.rs
@@ -1,7 +1,9 @@
-use crate::{Endpoint, Result};
use async_trait::async_trait;
-use crate::transports::{tcp, udp, unix};
+use crate::{
+ transports::{tcp, udp, unix},
+ Endpoint, Error, Result,
+};
/// Alias for `Box<dyn Connection>`
pub type Conn = Box<dyn Connection>;
@@ -28,7 +30,7 @@ pub trait Connection: Send + Sync {
/// Connects to the provided endpoint.
///
-/// it only supports `tcp4/6`, `udp4/6` and `unix`.
+/// it only supports `tcp4/6`, `udp4/6`, and `unix`.
///
/// #Example
///
@@ -53,5 +55,6 @@ pub async fn dial(endpoint: &Endpoint) -> Result<Conn> {
Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::dial_tcp(addr, port).await?)),
Endpoint::Udp(addr, port) => Ok(Box::new(udp::dial_udp(addr, port).await?)),
Endpoint::Unix(addr) => Ok(Box::new(unix::dial_unix(addr).await?)),
+ _ => Err(Error::InvalidEndpoint(endpoint.to_string())),
}
}
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs
index 50dfe6b..720eea3 100644
--- a/net/src/endpoint.rs
+++ b/net/src/endpoint.rs
@@ -5,7 +5,7 @@ use std::{
str::FromStr,
};
-use bincode::{Decode, Encode};
+use bincode::{impl_borrow_decode, Decode, Encode};
use url::Url;
use crate::{Error, Result};
@@ -33,6 +33,7 @@ pub type Port = u16;
pub enum Endpoint {
Udp(Addr, Port),
Tcp(Addr, Port),
+ Tls(Addr, Port),
Unix(String),
}
@@ -45,6 +46,9 @@ impl std::fmt::Display for Endpoint {
Endpoint::Tcp(ip, port) => {
write!(f, "tcp://{}:{}", ip, port)
}
+ Endpoint::Tls(ip, port) => {
+ write!(f, "tls://{}:{}", ip, port)
+ }
Endpoint::Unix(path) => {
if path.is_empty() {
write!(f, "unix:/UNNAMED")
@@ -60,9 +64,10 @@ impl TryFrom<Endpoint> for SocketAddr {
type Error = Error;
fn try_from(endpoint: Endpoint) -> std::result::Result<SocketAddr, Self::Error> {
match endpoint {
- Endpoint::Udp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)),
- Endpoint::Tcp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)),
- Endpoint::Unix(_) => Err(Error::TryFromEndpointError),
+ Endpoint::Udp(ip, port) | Endpoint::Tcp(ip, port) | Endpoint::Tls(ip, port) => {
+ Ok(SocketAddr::new(ip.try_into()?, port))
+ }
+ Endpoint::Unix(_) => Err(Error::TryFromEndpoint),
}
}
}
@@ -72,7 +77,7 @@ impl TryFrom<Endpoint> for PathBuf {
fn try_from(endpoint: Endpoint) -> std::result::Result<PathBuf, Self::Error> {
match endpoint {
Endpoint::Unix(path) => Ok(PathBuf::from(&path)),
- _ => Err(Error::TryFromEndpointError),
+ _ => Err(Error::TryFromEndpoint),
}
}
}
@@ -82,7 +87,7 @@ impl TryFrom<Endpoint> for UnixSocketAddress {
fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> {
match endpoint {
Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?),
- _ => Err(Error::TryFromEndpointError),
+ _ => Err(Error::TryFromEndpoint),
}
}
}
@@ -112,6 +117,7 @@ impl FromStr for Endpoint {
match url.scheme() {
"tcp" => Ok(Endpoint::Tcp(addr, port)),
"udp" => Ok(Endpoint::Udp(addr, port)),
+ "tls" => Ok(Endpoint::Tls(addr, port)),
_ => Err(Error::InvalidEndpoint(s.to_string())),
}
} else {
@@ -133,6 +139,11 @@ impl Endpoint {
Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port())
}
+ /// Creates a new TLS endpoint from a `SocketAddr`.
+ pub fn new_tls_addr(addr: &SocketAddr) -> Endpoint {
+ Endpoint::Tls(Addr::Ip(addr.ip()), addr.port())
+ }
+
/// Creates a new UDP endpoint from a `SocketAddr`.
pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint {
Endpoint::Udp(Addr::Ip(addr.ip()), addr.port())
@@ -151,29 +162,62 @@ impl Endpoint {
/// Returns the `Port` of the endpoint.
pub fn port(&self) -> Result<&Port> {
match self {
- Endpoint::Tcp(_, port) => Ok(port),
- Endpoint::Udp(_, port) => Ok(port),
- _ => Err(Error::TryFromEndpointError),
+ Endpoint::Udp(_, port) | Endpoint::Tcp(_, port) | Endpoint::Tls(_, port) => Ok(port),
+ _ => Err(Error::TryFromEndpoint),
}
}
/// Returns the `Addr` of the endpoint.
pub fn addr(&self) -> Result<&Addr> {
match self {
- Endpoint::Tcp(addr, _) => Ok(addr),
- Endpoint::Udp(addr, _) => Ok(addr),
- _ => Err(Error::TryFromEndpointError),
+ Endpoint::Udp(addr, _) | Endpoint::Tcp(addr, _) | Endpoint::Tls(addr, _) => Ok(addr),
+ _ => Err(Error::TryFromEndpoint),
}
}
}
/// Addr defines a type for an address, either IP or domain.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Addr {
Ip(IpAddr),
Domain(String),
}
+impl Encode for Addr {
+ fn encode<E: bincode::enc::Encoder>(
+ &self,
+ encoder: &mut E,
+ ) -> std::result::Result<(), bincode::error::EncodeError> {
+ match self {
+ Addr::Ip(addr) => {
+ 0u32.encode(encoder)?;
+ addr.encode(encoder)
+ }
+ Addr::Domain(domain) => {
+ 1u32.encode(encoder)?;
+ domain.encode(encoder)
+ }
+ }
+ }
+}
+
+impl Decode for Addr {
+ fn decode<D: bincode::de::Decoder>(
+ decoder: &mut D,
+ ) -> std::result::Result<Self, bincode::error::DecodeError> {
+ match u32::decode(decoder)? {
+ 0 => Ok(Addr::Ip(IpAddr::decode(decoder)?)),
+ 1 => Ok(Addr::Domain(String::decode(decoder)?)),
+ found => Err(bincode::error::DecodeError::UnexpectedVariant {
+ allowed: &bincode::error::AllowedEnumVariants::Range { min: 0, max: 1 },
+ found,
+ type_name: core::any::type_name::<Addr>(),
+ }),
+ }
+ }
+}
+impl_borrow_decode!(Addr);
+
impl TryFrom<Addr> for IpAddr {
type Error = Error;
fn try_from(addr: Addr) -> std::result::Result<IpAddr, Self::Error> {
diff --git a/net/src/error.rs b/net/src/error.rs
index 346184a..5dd6348 100644
--- a/net/src/error.rs
+++ b/net/src/error.rs
@@ -8,7 +8,7 @@ pub enum Error {
IO(#[from] std::io::Error),
#[error("Try from endpoint Error")]
- TryFromEndpointError,
+ TryFromEndpoint,
#[error("invalid address {0}")]
InvalidAddress(String),
@@ -28,6 +28,12 @@ pub enum Error {
#[error(transparent)]
ChannelRecv(#[from] smol::channel::RecvError),
+ #[error("Tls Error: {0}")]
+ Rustls(#[from] async_rustls::rustls::Error),
+
+ #[error("Invalid DNS Name: {0}")]
+ InvalidDnsNameError(#[from] async_rustls::rustls::client::InvalidDnsNameError),
+
#[error(transparent)]
KaryonsCore(#[from] karyons_core::error::Error),
}
diff --git a/net/src/lib.rs b/net/src/lib.rs
index 0e4c361..61069ef 100644
--- a/net/src/lib.rs
+++ b/net/src/lib.rs
@@ -10,6 +10,7 @@ pub use {
listener::{listen, Listener},
transports::{
tcp::{dial_tcp, listen_tcp, TcpConn},
+ tls,
udp::{dial_udp, listen_udp, UdpConn},
unix::{dial_unix, listen_unix, UnixConn},
},
diff --git a/net/src/listener.rs b/net/src/listener.rs
index 31a63ae..c6c3d94 100644
--- a/net/src/listener.rs
+++ b/net/src/listener.rs
@@ -1,9 +1,8 @@
-use crate::{Endpoint, Error, Result};
use async_trait::async_trait;
use crate::{
transports::{tcp, unix},
- Conn,
+ Conn, Endpoint, Error, Result,
};
/// Listener is a generic network listener.
@@ -15,7 +14,7 @@ pub trait Listener: Send + Sync {
/// Listens to the provided endpoint.
///
-/// it only supports `tcp4/6` and `unix`.
+/// it only supports `tcp4/6`, and `unix`.
///
/// #Example
///
diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs
index f399133..ac23021 100644
--- a/net/src/transports/mod.rs
+++ b/net/src/transports/mod.rs
@@ -1,3 +1,4 @@
pub mod tcp;
+pub mod tls;
pub mod udp;
pub mod unix;
diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs
index 84aa980..37f00a7 100644
--- a/net/src/transports/tcp.rs
+++ b/net/src/transports/tcp.rs
@@ -13,7 +13,7 @@ use crate::{
Error, Result,
};
-/// TCP network connection implementations of the [`Connection`] trait.
+/// TCP network connection implementation of the [`Connection`] trait.
pub struct TcpConn {
inner: TcpStream,
read: Mutex<ReadHalf<TcpStream>>,
diff --git a/net/src/transports/tls.rs b/net/src/transports/tls.rs
new file mode 100644
index 0000000..01bb5aa
--- /dev/null
+++ b/net/src/transports/tls.rs
@@ -0,0 +1,140 @@
+use std::sync::Arc;
+
+use async_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream};
+use async_trait::async_trait;
+use smol::{
+ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
+ lock::Mutex,
+ net::{TcpListener, TcpStream},
+};
+
+use crate::{
+ connection::Connection,
+ endpoint::{Addr, Endpoint, Port},
+ listener::Listener,
+ Error, Result,
+};
+
+/// TLS network connection implementation of the [`Connection`] trait.
+pub struct TlsConn {
+ inner: TcpStream,
+ read: Mutex<ReadHalf<TlsStream<TcpStream>>>,
+ write: Mutex<WriteHalf<TlsStream<TcpStream>>>,
+}
+
+impl TlsConn {
+ /// Creates a new TlsConn
+ pub fn new(sock: TcpStream, conn: TlsStream<TcpStream>) -> Self {
+ let (read, write) = split(conn);
+ Self {
+ inner: sock,
+ read: Mutex::new(read),
+ write: Mutex::new(write),
+ }
+ }
+}
+
+#[async_trait]
+impl Connection for TlsConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tls_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tls_addr(&self.inner.local_addr()?))
+ }
+
+ async fn read(&self, buf: &mut [u8]) -> Result<usize> {
+ self.read.lock().await.read(buf).await.map_err(Error::from)
+ }
+
+ async fn write(&self, buf: &[u8]) -> Result<usize> {
+ self.write
+ .lock()
+ .await
+ .write(buf)
+ .await
+ .map_err(Error::from)
+ }
+}
+
+/// Connects to the given TLS address and port.
+pub async fn dial_tls(
+ addr: &Addr,
+ port: &Port,
+ config: rustls::ClientConfig,
+ dns_name: &str,
+) -> Result<TlsConn> {
+ let address = format!("{}:{}", addr, port);
+
+ let connector = TlsConnector::from(Arc::new(config));
+
+ let sock = TcpStream::connect(&address).await?;
+ sock.set_nodelay(true)?;
+
+ let altname = rustls::ServerName::try_from(dns_name)?;
+ let conn = connector.connect(altname, sock.clone()).await?;
+ Ok(TlsConn::new(sock, TlsStream::Client(conn)))
+}
+
+/// Connects to the given TLS endpoint, returns `Conn` ([`Connection`]).
+pub async fn dial(
+ endpoint: &Endpoint,
+ config: rustls::ClientConfig,
+ dns_name: &str,
+) -> Result<Box<dyn Connection>> {
+ match endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => {}
+ _ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
+ }
+
+ dial_tls(endpoint.addr()?, endpoint.port()?, config, dns_name)
+ .await
+ .map(|c| Box::new(c) as Box<dyn Connection>)
+}
+/// Tls network listener implementation of the [`Listener`] trait.
+pub struct TlsListener {
+ acceptor: TlsAcceptor,
+ listener: TcpListener,
+}
+
+#[async_trait]
+impl Listener for TlsListener {
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tls_addr(&self.listener.local_addr()?))
+ }
+
+ async fn accept(&self) -> Result<Box<dyn Connection>> {
+ let (sock, _) = self.listener.accept().await?;
+ sock.set_nodelay(true)?;
+ let conn = self.acceptor.accept(sock.clone()).await?;
+ Ok(Box::new(TlsConn::new(sock, TlsStream::Server(conn))))
+ }
+}
+
+/// Listens on the given TLS address and port.
+pub async fn listen_tls(
+ addr: &Addr,
+ port: &Port,
+ config: rustls::ServerConfig,
+) -> Result<TlsListener> {
+ let address = format!("{}:{}", addr, port);
+ let acceptor = TlsAcceptor::from(Arc::new(config));
+ let listener = TcpListener::bind(&address).await?;
+ Ok(TlsListener { acceptor, listener })
+}
+
+/// Listens on the given TLS endpoint, returns [`Listener`].
+pub async fn listen(
+ endpoint: &Endpoint,
+ config: rustls::ServerConfig,
+) -> Result<Box<dyn Listener>> {
+ match endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => {}
+ _ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
+ }
+
+ listen_tls(endpoint.addr()?, endpoint.port()?, config)
+ .await
+ .map(|l| Box::new(l) as Box<dyn Listener>)
+}
diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs
index ca5b94d..8a2fbec 100644
--- a/net/src/transports/udp.rs
+++ b/net/src/transports/udp.rs
@@ -9,7 +9,7 @@ use crate::{
Error, Result,
};
-/// UDP network connection implementations of the [`Connection`] trait.
+/// UDP network connection implementation of the [`Connection`] trait.
pub struct UdpConn {
inner: UdpSocket,
}
diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs
index a720d91..e504934 100644
--- a/net/src/transports/unix.rs
+++ b/net/src/transports/unix.rs
@@ -8,7 +8,7 @@ use smol::{
use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Error, Result};
-/// Unix domain socket implementations of the [`Connection`] trait.
+/// Unix domain socket implementation of the [`Connection`] trait.
pub struct UnixConn {
inner: UnixStream,
read: Mutex<ReadHalf<UnixStream>>,
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index 98b700e..315983b 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -20,6 +20,12 @@ thiserror = "1.0.47"
semver = "1.0.20"
sha2 = "0.10.8"
+# tls
+async-rustls = { version = "0.4.1", features = ["dangerous_configuration"] }
+rcgen = "0.11.3"
+yasna = "0.5.2"
+x509-parser = "0.15.1"
+
[[example]]
name = "peer"
path = "examples/peer.rs"
diff --git a/p2p/README.md b/p2p/README.md
index 5bdaf63..edc5fcd 100644
--- a/p2p/README.md
+++ b/p2p/README.md
@@ -115,11 +115,12 @@ impl Protocol for NewProtocol {
Whenever a new peer is added to the PeerPool, all the protocols, including
your custom protocols, will automatically start running with the newly connected peer.
-## Network Security
+## Network Security
-It's obvious that connections in karyons p2p are not secure at the moment, as
-it currently only supports TCP connections. However, we are currently working
-on adding support for TLS connections.
+Using TLS is possible for all inbound and outbound connections by enabling the
+boolean `enable_tls` field in the configuration. However, implementing TLS for
+a P2P network is not trivial and is still unstable, requiring a comprehensive
+audit.
## Usage
@@ -129,5 +130,5 @@ If you have tmux installed, you can run the network simulation script in the
examples directory to run 12 peers simultaneously.
```bash
-$ RUST_LOG=karyons=debug ./net_simulation.sh
+$ RUST_LOG=karyons=info ./net_simulation.sh
```
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 907ba06..d94bca4 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -7,11 +7,12 @@ use async_trait::async_trait;
use clap::Parser;
use smol::{channel, Executor};
+use karyons_core::key_pair::{KeyPair, KeyPairType};
use karyons_net::{Endpoint, Port};
use karyons_p2p::{
protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID},
- ArcPeer, Backend, Config, P2pError, PeerID, Version,
+ ArcPeer, Backend, Config, P2pError, Version,
};
use shared::run_executor;
@@ -102,7 +103,7 @@ fn main() {
let cli = Cli::parse();
// Create a PeerID based on the username.
- let peer_id = PeerID::new(cli.username.as_bytes());
+ let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
// Create the configuration for the backend.
let config = Config {
@@ -117,7 +118,7 @@ fn main() {
let ex = Arc::new(Executor::new());
// Create a new Backend
- let backend = Backend::new(peer_id, config, ex.clone());
+ let backend = Backend::new(&key_pair, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index fc48c2f..530d2d5 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -5,9 +5,10 @@ use std::sync::Arc;
use clap::Parser;
use smol::{channel, Executor};
+use karyons_core::key_pair::{KeyPair, KeyPairType};
use karyons_net::{Endpoint, Port};
-use karyons_p2p::{Backend, Config, PeerID};
+use karyons_p2p::{Backend, Config};
use shared::run_executor;
@@ -29,20 +30,13 @@ struct Cli {
/// Optional TCP/UDP port for the discovery service.
#[arg(short)]
discovery_port: Option<Port>,
-
- /// Optional user id
- #[arg(long)]
- userid: Option<String>,
}
fn main() {
env_logger::init();
let cli = Cli::parse();
- let peer_id = match cli.userid {
- Some(userid) => PeerID::new(userid.as_bytes()),
- None => PeerID::random(),
- };
+ let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
// Create the configuration for the backend.
let config = Config {
@@ -57,7 +51,7 @@ fn main() {
let ex = Arc::new(Executor::new());
// Create a new Backend
- let backend = Backend::new(peer_id, config, ex.clone());
+ let backend = Backend::new(&key_pair, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
diff --git a/p2p/examples/net_simulation.sh b/p2p/examples/net_simulation.sh
index 1a05adf..dd489e5 100755
--- a/p2p/examples/net_simulation.sh
+++ b/p2p/examples/net_simulation.sh
@@ -5,27 +5,27 @@ cargo build --release --example peer
tmux new-session -d -s karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer1'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30000' -d '30010'" Enter
tmux split-window -h -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer2'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30001' -d '30011' -b 'tcp://127.0.0.1:30010 ' " Enter
tmux split-window -h -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer3'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30002' -d '30012' -b 'tcp://127.0.0.1:30010'" Enter
tmux split-window -h -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer4'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30003' -d '30013' -b 'tcp://127.0.0.1:30010'" Enter
tmux split-window -h -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer5'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30004' -d '30014' -b 'tcp://127.0.0.1:30010'" Enter
tmux split-window -h -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer6'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-l 'tcp://127.0.0.1:30005' -d '30015' -b 'tcp://127.0.0.1:30010'" Enter
tmux select-layout even-horizontal
@@ -35,37 +35,37 @@ sleep 3;
tmux select-pane -t karyons_p2p:0.0
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer7'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30011'" Enter
tmux select-pane -t karyons_p2p:0.2
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer8'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30012' -p 'tcp://127.0.0.1:30005'" Enter
tmux select-pane -t karyons_p2p:0.4
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer9'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30013'" Enter
tmux select-pane -t karyons_p2p:0.6
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer10'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30014'" Enter
tmux select-pane -t karyons_p2p:0.8
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer11'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30015'" Enter
tmux select-pane -t karyons_p2p:0.10
tmux split-window -v -t karyons_p2p
-tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer12'\
+tmux send-keys -t karyons_p2p "../../target/release/examples/peer\
-b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30015' -b 'tcp://127.0.0.1:30011'" Enter
# tmux set-window-option -t karyons_p2p synchronize-panes on
diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs
index 5ff365d..b595b4a 100644
--- a/p2p/examples/peer.rs
+++ b/p2p/examples/peer.rs
@@ -5,9 +5,10 @@ use std::sync::Arc;
use clap::Parser;
use smol::{channel, Executor};
+use karyons_core::key_pair::{KeyPair, KeyPairType};
use karyons_net::{Endpoint, Port};
-use karyons_p2p::{Backend, Config, PeerID};
+use karyons_p2p::{Backend, Config};
use shared::run_executor;
@@ -29,20 +30,13 @@ struct Cli {
/// Optional TCP/UDP port for the discovery service.
#[arg(short)]
discovery_port: Option<Port>,
-
- /// Optional user id
- #[arg(long)]
- userid: Option<String>,
}
fn main() {
env_logger::init();
let cli = Cli::parse();
- let peer_id = match cli.userid {
- Some(userid) => PeerID::new(userid.as_bytes()),
- None => PeerID::random(),
- };
+ let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
// Create the configuration for the backend.
let config = Config {
@@ -57,7 +51,7 @@ fn main() {
let ex = Arc::new(Executor::new());
// Create a new Backend
- let backend = Backend::new(peer_id, config, ex.clone());
+ let backend = Backend::new(&key_pair, config, ex.clone());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
let handle = move || ctrlc_s.try_send(()).unwrap();
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 2e34f47..56d79f7 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use log::info;
-use karyons_core::{pubsub::Subscription, GlobalExecutor};
+use karyons_core::{key_pair::KeyPair, pubsub::Subscription, GlobalExecutor};
use crate::{
config::Config,
@@ -22,8 +22,8 @@ pub struct Backend {
/// The Configuration for the P2P network.
config: Arc<Config>,
- /// Peer ID.
- id: PeerID,
+ /// Identity Key pair
+ key_pair: KeyPair,
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
@@ -37,17 +37,34 @@ pub struct Backend {
impl Backend {
/// Creates a new Backend.
- pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend {
+ pub fn new(key_pair: &KeyPair, config: Config, ex: GlobalExecutor) -> ArcBackend {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new());
- let cq = ConnQueue::new();
-
- let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone());
-
- let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex);
+ let conn_queue = ConnQueue::new();
+
+ let peer_id = PeerID::try_from(key_pair.public())
+ .expect("Derive a peer id from the provided key pair.");
+ info!("PeerID: {}", peer_id);
+
+ let peer_pool = PeerPool::new(
+ &peer_id,
+ conn_queue.clone(),
+ config.clone(),
+ monitor.clone(),
+ ex.clone(),
+ );
+
+ let discovery = Discovery::new(
+ key_pair,
+ &peer_id,
+ conn_queue,
+ config.clone(),
+ monitor.clone(),
+ ex,
+ );
Arc::new(Self {
- id: id.clone(),
+ key_pair: key_pair.clone(),
monitor,
discovery,
config,
@@ -57,7 +74,6 @@ impl Backend {
/// Run the Backend, starting the PeerPool and Discovery instances.
pub async fn run(self: &Arc<Self>) -> Result<()> {
- info!("Run the backend {}", self.id);
self.peer_pool.start().await?;
self.discovery.start().await?;
Ok(())
@@ -81,6 +97,11 @@ impl Backend {
self.config.clone()
}
+ /// Returns the `KeyPair`.
+ pub async fn key_pair(&self) -> &KeyPair {
+ &self.key_pair
+ }
+
/// Returns the number of occupied inbound slots.
pub fn inbound_slots(&self) -> usize {
self.discovery.inbound_slots.load()
diff --git a/p2p/src/io_codec.rs b/p2p/src/codec.rs
index ea62666..e521824 100644
--- a/p2p/src/io_codec.rs
+++ b/p2p/src/codec.rs
@@ -3,8 +3,8 @@ use std::time::Duration;
use bincode::{Decode, Encode};
use karyons_core::{
- async_utils::timeout,
- utils::{decode, encode, encode_into_slice},
+ async_util::timeout,
+ util::{decode, encode, encode_into_slice},
};
use karyons_net::{Connection, NetError};
@@ -17,16 +17,16 @@ use crate::{
pub trait CodecMsg: Decode + Encode + std::fmt::Debug {}
impl<T: Encode + Decode + std::fmt::Debug> CodecMsg for T {}
-/// I/O codec working with generic network connections.
+/// A Codec working with generic network connections.
///
/// It is responsible for both decoding data received from the network and
/// encoding data before sending it.
-pub struct IOCodec {
+pub struct Codec {
conn: Box<dyn Connection>,
}
-impl IOCodec {
- /// Creates a new IOCodec.
+impl Codec {
+ /// Creates a new Codec.
pub fn new(conn: Box<dyn Connection>) -> Self {
Self { conn }
}
@@ -88,18 +88,6 @@ impl IOCodec {
.map_err(|_| NetError::Timeout)?
}
- /// Writes a message of type `T` with the given timeout.
- pub async fn write_timeout<T: CodecMsg>(
- &self,
- command: NetMsgCmd,
- msg: &T,
- duration: Duration,
- ) -> Result<()> {
- timeout(duration, self.write(command, msg))
- .await
- .map_err(|_| NetError::Timeout)?
- }
-
/// Reads the exact number of bytes required to fill `buf`.
async fn read_exact(&self, mut buf: &mut [u8]) -> Result<()> {
while !buf.is_empty() {
diff --git a/p2p/src/config.rs b/p2p/src/config.rs
index ebecbf0..2c5d5ec 100644
--- a/p2p/src/config.rs
+++ b/p2p/src/config.rs
@@ -1,6 +1,6 @@
use karyons_net::{Endpoint, Port};
-use crate::utils::Version;
+use crate::Version;
/// the Configuration for the P2P network.
pub struct Config {
@@ -71,6 +71,9 @@ pub struct Config {
/// The maximum number of retries for outbound connection establishment
/// during the refresh process.
pub refresh_connect_retries: usize,
+
+ /// Enables TLS for all connections.
+ pub enable_tls: bool,
}
impl Default for Config {
@@ -100,6 +103,8 @@ impl Default for Config {
refresh_interval: 1800,
refresh_response_timeout: 1,
refresh_connect_retries: 3,
+
+ enable_tls: false,
}
}
}
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index 8ec2617..e0a3bbd 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -2,7 +2,7 @@ use std::{collections::VecDeque, fmt, sync::Arc};
use smol::{channel::Sender, lock::Mutex};
-use karyons_core::async_utils::CondVar;
+use karyons_core::async_util::CondVar;
use karyons_net::Conn;
use crate::Result;
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index f41ab57..6fc5734 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -1,21 +1,28 @@
use std::{future::Future, sync::Arc};
-use log::{trace, warn};
+use log::{error, trace, warn};
use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
-use karyons_net::{dial, Conn, Endpoint, NetError};
+use karyons_net::{dial, tls, Conn, Endpoint, NetError};
use crate::{
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
- Result,
+ tls_config::tls_client_config,
+ Error, PeerID, Result,
};
+static DNS_NAME: &str = "karyons.org";
+
/// Responsible for creating outbound connections with other peers.
pub struct Connector {
+ /// Identity Key pair
+ key_pair: KeyPair,
+
/// Managing spawned tasks.
task_group: TaskGroup<'static>,
@@ -26,6 +33,9 @@ pub struct Connector {
/// establishing a connection.
max_retries: usize,
+ /// Enables secure connection.
+ enable_tls: bool,
+
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
}
@@ -33,16 +43,20 @@ pub struct Connector {
impl Connector {
/// Creates a new Connector
pub fn new(
+ key_pair: &KeyPair,
max_retries: usize,
connection_slots: Arc<ConnectionSlots>,
+ enable_tls: bool,
monitor: Arc<Monitor>,
ex: GlobalExecutor,
) -> Arc<Self> {
Arc::new(Self {
+ key_pair: key_pair.clone(),
+ max_retries,
task_group: TaskGroup::new(ex),
monitor,
connection_slots,
- max_retries,
+ enable_tls,
})
}
@@ -57,20 +71,23 @@ impl Connector {
/// `Conn` instance.
///
/// This method will block until it finds an available slot.
- pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> {
+ pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
self.connection_slots.wait_for_slot().await;
self.connection_slots.add();
let mut retry = 0;
let backoff = Backoff::new(500, 2000);
while retry < self.max_retries {
- let conn_result = dial(endpoint).await;
-
- if let Ok(conn) = conn_result {
- self.monitor
- .notify(&ConnEvent::Connected(endpoint.clone()).into())
- .await;
- return Ok(conn);
+ match self.dial(endpoint, peer_id).await {
+ Ok(conn) => {
+ self.monitor
+ .notify(&ConnEvent::Connected(endpoint.clone()).into())
+ .await;
+ return Ok(conn);
+ }
+ Err(err) => {
+ error!("Failed to establish a connection to {endpoint}: {err}");
+ }
}
self.monitor
@@ -96,12 +113,13 @@ impl Connector {
pub async fn connect_with_cback<Fut>(
self: &Arc<Self>,
endpoint: &Endpoint,
+ peer_id: &Option<PeerID>,
callback: impl FnOnce(Conn) -> Fut + Send + 'static,
) -> Result<()>
where
Fut: Future<Output = Result<()>> + Send + 'static,
{
- let conn = self.connect(endpoint).await?;
+ let conn = self.connect(endpoint, peer_id).await?;
let selfc = self.clone();
let endpoint = endpoint.clone();
@@ -120,4 +138,14 @@ impl Connector {
Ok(())
}
+
+ async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> {
+ if self.enable_tls {
+ let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?;
+ tls::dial(endpoint, tls_config, DNS_NAME).await
+ } else {
+ dial(endpoint).await
+ }
+ .map_err(Error::KaryonsNet)
+ }
}
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 0138068..60d8635 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -5,13 +5,13 @@ use log::{error, trace};
use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
use smol::lock::{Mutex, RwLock};
-use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor};
+use karyons_core::{async_util::timeout, key_pair::KeyPair, util::decode, GlobalExecutor};
use karyons_net::{Conn, Endpoint};
use crate::{
+ codec::Codec,
connector::Connector,
- io_codec::IOCodec,
listener::Listener,
message::{
get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg,
@@ -20,7 +20,7 @@ use crate::{
monitor::{ConnEvent, DiscoveryEvent, Monitor},
routing_table::RoutingTable,
slots::ConnectionSlots,
- utils::version_match,
+ version::version_match,
Config, Error, PeerID, Result,
};
@@ -55,6 +55,7 @@ pub struct LookupService {
impl LookupService {
/// Creates a new lookup service
pub fn new(
+ key_pair: &KeyPair,
id: &PeerID,
table: Arc<Mutex<RoutingTable>>,
config: Arc<Config>,
@@ -64,11 +65,19 @@ impl LookupService {
let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
- let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
+ let listener = Listener::new(
+ key_pair,
+ inbound_slots.clone(),
+ config.enable_tls,
+ monitor.clone(),
+ ex.clone(),
+ );
let connector = Connector::new(
+ key_pair,
config.lookup_connect_retries,
outbound_slots.clone(),
+ config.enable_tls,
monitor.clone(),
ex,
);
@@ -116,14 +125,17 @@ impl LookupService {
/// randomly generated peer ID. Upon receiving peers from the initial lookup,
/// it starts connecting to these received peers and sends them a FindPeer
/// message that contains our own peer ID.
- pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> {
+ pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
trace!("Lookup started {endpoint}");
self.monitor
.notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into())
.await;
let mut random_peers = vec![];
- if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await {
+ if let Err(err) = self
+ .random_lookup(endpoint, peer_id, &mut random_peers)
+ .await
+ {
self.monitor
.notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into())
.await;
@@ -160,11 +172,14 @@ impl LookupService {
async fn random_lookup(
&self,
endpoint: &Endpoint,
+ peer_id: Option<PeerID>,
random_peers: &mut Vec<PeerMsg>,
) -> Result<()> {
for _ in 0..2 {
- let peer_id = PeerID::random();
- let peers = self.connect(&peer_id, endpoint.clone()).await?;
+ let random_peer_id = PeerID::random();
+ let peers = self
+ .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
+ .await?;
let table = self.table.lock().await;
for peer in peers {
@@ -187,7 +202,7 @@ impl LookupService {
let mut tasks = FuturesUnordered::new();
for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
- tasks.push(self.connect(&self.id, endpoint))
+ tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
}
while let Some(result) = tasks.next().await {
@@ -200,11 +215,17 @@ impl LookupService {
}
}
- /// Connects to the given endpoint
- async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> {
- let conn = self.connector.connect(&endpoint).await?;
- let io_codec = IOCodec::new(conn);
- let result = self.handle_outbound(io_codec, peer_id).await;
+ /// Connects to the given endpoint and initiates a lookup process for the
+ /// provided peer ID.
+ async fn connect(
+ &self,
+ endpoint: Endpoint,
+ peer_id: Option<PeerID>,
+ target_peer_id: &PeerID,
+ ) -> Result<Vec<PeerMsg>> {
+ let conn = self.connector.connect(&endpoint, &peer_id).await?;
+ let io_codec = Codec::new(conn);
+ let result = self.handle_outbound(io_codec, target_peer_id).await;
self.monitor
.notify(&ConnEvent::Disconnected(endpoint).into())
@@ -215,12 +236,16 @@ impl LookupService {
}
/// Handles outbound connection
- async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> {
+ async fn handle_outbound(
+ &self,
+ io_codec: Codec,
+ target_peer_id: &PeerID,
+ ) -> Result<Vec<PeerMsg>> {
trace!("Send Ping msg");
self.send_ping_msg(&io_codec).await?;
trace!("Send FindPeer msg");
- let peers = self.send_findpeer_msg(&io_codec, peer_id).await?;
+ let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?;
if peers.0.len() >= MAX_PEERS_IN_PEERSMSG {
return Err(Error::Lookup("Received too many peers in PeersMsg"));
@@ -260,7 +285,7 @@ impl LookupService {
/// Handles inbound connection
async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> {
- let io_codec = IOCodec::new(conn);
+ let io_codec = Codec::new(conn);
loop {
let msg: NetMsg = io_codec.read().await?;
trace!("Receive msg {:?}", msg.header.command);
@@ -293,7 +318,7 @@ impl LookupService {
}
/// Sends a Ping msg and wait to receive the Pong message.
- async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> {
trace!("Send Pong msg");
let mut nonce: [u8; 32] = [0; 32];
@@ -319,14 +344,14 @@ impl LookupService {
}
/// Sends a Pong msg
- async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> {
+ async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> {
trace!("Send Pong msg");
io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?;
Ok(())
}
/// Sends a FindPeer msg and wait to receivet the Peers msg.
- async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> {
+ async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> {
trace!("Send FindPeer msg");
io_codec
.write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone()))
@@ -342,7 +367,7 @@ impl LookupService {
}
/// Sends a Peers msg.
- async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> {
+ async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> {
trace!("Send Peers msg");
let table = self.table.lock().await;
let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
@@ -354,7 +379,7 @@ impl LookupService {
}
/// Sends a Peer msg.
- async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> {
+ async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> {
trace!("Send Peer msg");
let peer_msg = PeerMsg {
addr: endpoint.addr()?.clone(),
@@ -367,7 +392,7 @@ impl LookupService {
}
/// Sends a Shutdown msg.
- async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> {
trace!("Send Shutdown msg");
io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?;
Ok(())
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 7f55309..2c1bcd8 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use smol::lock::Mutex;
use karyons_core::{
- async_utils::{Backoff, TaskGroup, TaskResult},
+ async_util::{Backoff, TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
@@ -66,6 +67,7 @@ pub struct Discovery {
impl Discovery {
/// Creates a new Discovery
pub fn new(
+ key_pair: &KeyPair,
peer_id: &PeerID,
conn_queue: Arc<ConnQueue>,
config: Arc<Config>,
@@ -81,6 +83,7 @@ impl Discovery {
let refresh_service =
RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone());
let lookup_service = LookupService::new(
+ key_pair,
peer_id,
table.clone(),
config.clone(),
@@ -89,12 +92,21 @@ impl Discovery {
);
let connector = Connector::new(
+ key_pair,
config.max_connect_retries,
outbound_slots.clone(),
+ config.enable_tls,
+ monitor.clone(),
+ ex.clone(),
+ );
+
+ let listener = Listener::new(
+ key_pair,
+ inbound_slots.clone(),
+ config.enable_tls,
monitor.clone(),
ex.clone(),
);
- let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone());
Arc::new(Self {
refresh_service: Arc::new(refresh_service),
@@ -222,7 +234,7 @@ impl Discovery {
selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await;
}
Err(Error::PeerAlreadyConnected) => {
- // TODO
+ // TODO: Use the appropriate status.
selfc.update_entry(&pid, DISCONNECTED_ENTRY).await;
}
Err(_) => {
@@ -236,10 +248,13 @@ impl Discovery {
Ok(())
};
- let res = self.connector.connect_with_cback(endpoint, cback).await;
+ let result = self
+ .connector
+ .connect_with_cback(endpoint, &pid, cback)
+ .await;
if let Some(pid) = &pid {
- match res {
+ match result {
Ok(_) => {
self.update_entry(pid, CONNECTED_ENTRY).await;
}
@@ -260,7 +275,8 @@ impl Discovery {
match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await {
Some(entry) => {
let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
- if let Err(err) = self.lookup_service.start_lookup(&endpoint).await {
+ let peer_id = Some(entry.key.into());
+ if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await;
error!("Failed to do lookup: {endpoint}: {err}");
}
@@ -268,7 +284,7 @@ impl Discovery {
None => {
let peers = &self.config.bootstrap_peers;
for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
- if let Err(err) = self.lookup_service.start_lookup(endpoint).await {
+ if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
error!("Failed to do lookup: {endpoint}: {err}");
}
}
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index d095f19..f797c71 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -10,8 +10,8 @@ use smol::{
};
use karyons_core::{
- async_utils::{timeout, Backoff, TaskGroup, TaskResult},
- utils::{decode, encode},
+ async_util::{timeout, Backoff, TaskGroup, TaskResult},
+ util::{decode, encode},
GlobalExecutor,
};
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index 0c1d50c..6274d4c 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -11,6 +11,9 @@ pub enum Error {
#[error("Unsupported protocol error: {0}")]
UnsupportedProtocol(String),
+ #[error("Try from public key Error: {0}")]
+ TryFromPublicKey(&'static str),
+
#[error("Invalid message error: {0}")]
InvalidMsg(String),
@@ -50,6 +53,21 @@ pub enum Error {
#[error("Peer already connected")]
PeerAlreadyConnected,
+ #[error("Yasna Error: {0}")]
+ Yasna(#[from] yasna::ASN1Error),
+
+ #[error("X509 Parser Error: {0}")]
+ X509Parser(#[from] x509_parser::error::X509Error),
+
+ #[error("Rcgen Error: {0}")]
+ Rcgen(#[from] rcgen::RcgenError),
+
+ #[error("Tls Error: {0}")]
+ Rustls(#[from] async_rustls::rustls::Error),
+
+ #[error("Invalid DNS Name: {0}")]
+ InvalidDnsNameError(#[from] async_rustls::rustls::client::InvalidDnsNameError),
+
#[error("Channel Send Error: {0}")]
ChannelSend(String),
diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs
index c0a3b5b..6585287 100644
--- a/p2p/src/lib.rs
+++ b/p2p/src/lib.rs
@@ -7,19 +7,19 @@
//! use easy_parallel::Parallel;
//! use smol::{channel as smol_channel, future, Executor};
//!
+//! use karyons_core::key_pair::{KeyPair, KeyPairType};
//! use karyons_p2p::{Backend, Config, PeerID};
//!
-//! let peer_id = PeerID::random();
+//! let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
//!
//! // Create the configuration for the backend.
//! let mut config = Config::default();
//!
-//!
//! // Create a new Executor
//! let ex = Arc::new(Executor::new());
//!
//! // Create a new Backend
-//! let backend = Backend::new(peer_id, config, ex.clone());
+//! let backend = Backend::new(&key_pair, config, ex.clone());
//!
//! let task = async {
//! // Run the backend
@@ -36,12 +36,12 @@
//! ```
//!
mod backend;
+mod codec;
mod config;
mod connection;
mod connector;
mod discovery;
mod error;
-mod io_codec;
mod listener;
mod message;
mod peer;
@@ -49,7 +49,8 @@ mod peer_pool;
mod protocols;
mod routing_table;
mod slots;
-mod utils;
+mod tls_config;
+mod version;
/// Responsible for network and system monitoring.
/// [`Read More`](./monitor/struct.Monitor.html)
@@ -62,6 +63,6 @@ pub use backend::{ArcBackend, Backend};
pub use config::Config;
pub use error::Error as P2pError;
pub use peer::{ArcPeer, PeerID};
-pub use utils::Version;
+pub use version::Version;
use error::{Error, Result};
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index f2391f7..58a0931 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -1,28 +1,36 @@
use std::{future::Future, sync::Arc};
-use log::{error, info, trace};
+use log::{debug, error, info};
use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
+ async_util::{TaskGroup, TaskResult},
+ key_pair::KeyPair,
GlobalExecutor,
};
-use karyons_net::{listen, Conn, Endpoint, Listener as NetListener};
+use karyons_net::{listen, tls, Conn, Endpoint, Listener as NetListener};
use crate::{
monitor::{ConnEvent, Monitor},
slots::ConnectionSlots,
- Result,
+ tls_config::tls_server_config,
+ Error, Result,
};
/// Responsible for creating inbound connections with other peers.
pub struct Listener {
+ /// Identity Key pair
+ key_pair: KeyPair,
+
/// Managing spawned tasks.
task_group: TaskGroup<'static>,
/// Manages available inbound slots.
connection_slots: Arc<ConnectionSlots>,
+ /// Enables secure connection.
+ enable_tls: bool,
+
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
}
@@ -30,13 +38,17 @@ pub struct Listener {
impl Listener {
/// Creates a new Listener
pub fn new(
+ key_pair: &KeyPair,
connection_slots: Arc<ConnectionSlots>,
+ enable_tls: bool,
monitor: Arc<Monitor>,
ex: GlobalExecutor,
) -> Arc<Self> {
Arc::new(Self {
+ key_pair: key_pair.clone(),
connection_slots,
task_group: TaskGroup::new(ex),
+ enable_tls,
monitor,
})
}
@@ -55,7 +67,7 @@ impl Listener {
where
Fut: Future<Output = Result<()>> + Send + 'static,
{
- let listener = match listen(&endpoint).await {
+ let listener = match self.listend(&endpoint).await {
Ok(listener) => {
self.monitor
.notify(&ConnEvent::Listening(endpoint.clone()).into())
@@ -67,21 +79,17 @@ impl Listener {
self.monitor
.notify(&ConnEvent::ListenFailed(endpoint).into())
.await;
- return Err(err.into());
+ return Err(err);
}
};
let resolved_endpoint = listener.local_endpoint()?;
- info!("Start listening on {endpoint}");
+ info!("Start listening on {resolved_endpoint}");
let selfc = self.clone();
self.task_group
- .spawn(selfc.listen_loop(listener, callback), |res| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Listen loop stopped: {endpoint} {err}");
- }
- });
+ .spawn(selfc.listen_loop(listener, callback), |_| async {});
Ok(resolved_endpoint)
}
@@ -94,8 +102,7 @@ impl Listener {
self: Arc<Self>,
listener: Box<dyn NetListener>,
callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static,
- ) -> Result<()>
- where
+ ) where
Fut: Future<Output = Result<()>> + Send + 'static,
{
loop {
@@ -103,27 +110,35 @@ impl Listener {
self.connection_slots.wait_for_slot().await;
let result = listener.accept().await;
- let conn = match result {
+ let (conn, endpoint) = match result {
Ok(c) => {
+ let endpoint = match c.peer_endpoint() {
+ Ok(e) => e,
+ Err(err) => {
+ self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
+ error!("Failed to accept a new connection: {err}");
+ continue;
+ }
+ };
+
self.monitor
- .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into())
+ .notify(&ConnEvent::Accepted(endpoint.clone()).into())
.await;
- c
+ (c, endpoint)
}
Err(err) => {
error!("Failed to accept a new connection: {err}");
self.monitor.notify(&ConnEvent::AcceptFailed.into()).await;
- return Err(err.into());
+ continue;
}
};
self.connection_slots.add();
let selfc = self.clone();
- let endpoint = conn.peer_endpoint()?;
let on_disconnect = |res| async move {
if let TaskResult::Completed(Err(err)) = res {
- trace!("Inbound connection dropped: {err}");
+ debug!("Inbound connection dropped: {err}");
}
selfc
.monitor
@@ -136,4 +151,14 @@ impl Listener {
self.task_group.spawn(callback(conn), on_disconnect);
}
}
+
+ async fn listend(&self, endpoint: &Endpoint) -> Result<Box<dyn NetListener>> {
+ if self.enable_tls {
+ let tls_config = tls_server_config(&self.key_pair)?;
+ tls::listen(endpoint, tls_config).await
+ } else {
+ listen(endpoint).await
+ }
+ .map_err(Error::KaryonsNet)
+ }
}
diff --git a/p2p/src/message.rs b/p2p/src/message.rs
index 3779cc1..6b23322 100644
--- a/p2p/src/message.rs
+++ b/p2p/src/message.rs
@@ -4,7 +4,7 @@ use bincode::{Decode, Encode};
use karyons_net::{Addr, Port};
-use crate::{protocol::ProtocolID, routing_table::Entry, utils::VersionInt, PeerID};
+use crate::{protocol::ProtocolID, routing_table::Entry, version::VersionInt, PeerID};
/// The size of the message header, in bytes.
pub const MSG_HEADER_SIZE: usize = 6;
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs
index fbbf43f..1f74503 100644
--- a/p2p/src/monitor.rs
+++ b/p2p/src/monitor.rs
@@ -17,6 +17,7 @@ use karyons_net::Endpoint;
///
/// use smol::Executor;
///
+/// use karyons_core::key_pair::{KeyPair, KeyPairType};
/// use karyons_p2p::{Config, Backend, PeerID};
///
/// async {
@@ -24,7 +25,8 @@ use karyons_net::Endpoint;
/// // Create a new Executor
/// let ex = Arc::new(Executor::new());
///
-/// let backend = Backend::new(PeerID::random(), Config::default(), ex);
+/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
+/// let backend = Backend::new(&key_pair, Config::default(), ex);
///
/// // Create a new Subscription
/// let sub = backend.monitor().await;
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 85cd558..6ed0dd8 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -11,17 +11,17 @@ use smol::{
};
use karyons_core::{
- async_utils::{select, Either, TaskGroup, TaskResult},
+ async_util::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
- utils::{decode, encode},
+ util::{decode, encode},
GlobalExecutor,
};
use karyons_net::Endpoint;
use crate::{
+ codec::{Codec, CodecMsg},
connection::ConnDirection,
- io_codec::{CodecMsg, IOCodec},
message::{NetMsgCmd, ProtocolMsg, ShutdownMsg},
peer_pool::{ArcPeerPool, WeakPeerPool},
protocol::{Protocol, ProtocolEvent, ProtocolID},
@@ -37,8 +37,8 @@ pub struct Peer {
/// A weak pointer to `PeerPool`
peer_pool: WeakPeerPool,
- /// Holds the IOCodec for the peer connection
- io_codec: IOCodec,
+ /// Holds the Codec for the peer connection
+ codec: Codec,
/// Remote endpoint for the peer
remote_endpoint: Endpoint,
@@ -64,7 +64,7 @@ impl Peer {
pub fn new(
peer_pool: WeakPeerPool,
id: &PeerID,
- io_codec: IOCodec,
+ codec: Codec,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
ex: GlobalExecutor,
@@ -72,7 +72,7 @@ impl Peer {
Arc::new(Peer {
id: id.clone(),
peer_pool,
- io_codec,
+ codec,
protocol_ids: RwLock::new(Vec::new()),
remote_endpoint,
conn_direction,
@@ -97,7 +97,7 @@ impl Peer {
payload: payload.to_vec(),
};
- self.io_codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
+ self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?;
Ok(())
}
@@ -124,10 +124,7 @@ impl Peer {
let _ = self.stop_chan.0.try_send(Ok(()));
// No need to handle the error here
- let _ = self
- .io_codec
- .write(NetMsgCmd::Shutdown, &ShutdownMsg(0))
- .await;
+ let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await;
// Force shutting down
self.task_group.cancel().await;
@@ -174,7 +171,7 @@ impl Peer {
/// Start a read loop to handle incoming messages from the peer connection.
async fn read_loop(&self) -> Result<()> {
loop {
- let fut = select(self.stop_chan.1.recv(), self.io_codec.read()).await;
+ let fut = select(self.stop_chan.1.recv(), self.codec.read()).await;
let result = match fut {
Either::Left(stop_signal) => {
trace!("Peer {} received a stop signal", self.id);
diff --git a/p2p/src/peer/peer_id.rs b/p2p/src/peer/peer_id.rs
index c8aec7d..903d827 100644
--- a/p2p/src/peer/peer_id.rs
+++ b/p2p/src/peer/peer_id.rs
@@ -2,6 +2,10 @@ use bincode::{Decode, Encode};
use rand::{rngs::OsRng, RngCore};
use sha2::{Digest, Sha256};
+use karyons_core::key_pair::PublicKey;
+
+use crate::Error;
+
/// Represents a unique identifier for a peer.
#[derive(Clone, Debug, Eq, PartialEq, Hash, Decode, Encode)]
pub struct PeerID(pub [u8; 32]);
@@ -39,3 +43,16 @@ impl From<[u8; 32]> for PeerID {
PeerID(b)
}
}
+
+impl TryFrom<PublicKey> for PeerID {
+ type Error = Error;
+
+ fn try_from(pk: PublicKey) -> Result<Self, Self::Error> {
+ let pk: [u8; 32] = pk
+ .as_bytes()
+ .try_into()
+ .map_err(|_| Error::TryFromPublicKey("Failed to convert public key to [u8;32]"))?;
+
+ Ok(PeerID(pk))
+ }
+}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index a0079f2..dd7e669 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -11,23 +11,23 @@ use smol::{
};
use karyons_core::{
- async_utils::{TaskGroup, TaskResult},
- utils::decode,
+ async_util::{TaskGroup, TaskResult},
+ util::decode,
GlobalExecutor,
};
use karyons_net::Conn;
use crate::{
+ codec::{Codec, CodecMsg},
config::Config,
connection::{ConnDirection, ConnQueue},
- io_codec::{CodecMsg, IOCodec},
message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
monitor::{Monitor, PeerPoolEvent},
peer::{ArcPeer, Peer, PeerID},
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
- utils::{version_match, Version, VersionInt},
+ version::{version_match, Version, VersionInt},
Error, Result,
};
@@ -155,10 +155,10 @@ impl PeerPool {
disconnect_signal: Sender<Result<()>>,
) -> Result<()> {
let endpoint = conn.peer_endpoint()?;
- let io_codec = IOCodec::new(conn);
+ let codec = Codec::new(conn);
// Do a handshake with the connection before creating a new peer.
- let pid = self.do_handshake(&io_codec, conn_direction).await?;
+ let pid = self.do_handshake(&codec, conn_direction).await?;
// TODO: Consider restricting the subnet for inbound connections
if self.contains_peer(&pid).await {
@@ -169,7 +169,7 @@ impl PeerPool {
let peer = Peer::new(
Arc::downgrade(self),
&pid,
- io_codec,
+ codec,
endpoint.clone(),
conn_direction.clone(),
self.executor.clone(),
@@ -235,20 +235,16 @@ impl PeerPool {
}
/// Initiate a handshake with a connection.
- async fn do_handshake(
- &self,
- io_codec: &IOCodec,
- conn_direction: &ConnDirection,
- ) -> Result<PeerID> {
+ async fn do_handshake(&self, codec: &Codec, conn_direction: &ConnDirection) -> Result<PeerID> {
match conn_direction {
ConnDirection::Inbound => {
- let result = self.wait_vermsg(io_codec).await;
+ let result = self.wait_vermsg(codec).await;
match result {
Ok(_) => {
- self.send_verack(io_codec, true).await?;
+ self.send_verack(codec, true).await?;
}
Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => {
- self.send_verack(io_codec, false).await?;
+ self.send_verack(codec, false).await?;
}
_ => {}
}
@@ -256,14 +252,14 @@ impl PeerPool {
}
ConnDirection::Outbound => {
- self.send_vermsg(io_codec).await?;
- self.wait_verack(io_codec).await
+ self.send_vermsg(codec).await?;
+ self.wait_verack(codec).await
}
}
}
/// Send a Version message
- async fn send_vermsg(&self, io_codec: &IOCodec) -> Result<()> {
+ async fn send_vermsg(&self, codec: &Codec) -> Result<()> {
let pids = self.protocol_versions.read().await;
let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect();
drop(pids);
@@ -275,16 +271,16 @@ impl PeerPool {
};
trace!("Send VerMsg");
- io_codec.write(NetMsgCmd::Version, &vermsg).await?;
+ codec.write(NetMsgCmd::Version, &vermsg).await?;
Ok(())
}
/// Wait for a Version message
///
/// Returns the peer's ID upon successfully receiving the Version message.
- async fn wait_vermsg(&self, io_codec: &IOCodec) -> Result<PeerID> {
+ async fn wait_vermsg(&self, codec: &Codec) -> Result<PeerID> {
let timeout = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = io_codec.read_timeout(timeout).await?;
+ let msg: NetMsg = codec.read_timeout(timeout).await?;
let payload = get_msg_payload!(Version, msg);
let (vermsg, _) = decode::<VerMsg>(&payload)?;
@@ -300,23 +296,23 @@ impl PeerPool {
}
/// Send a Verack message
- async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> {
+ async fn send_verack(&self, codec: &Codec, ack: bool) -> Result<()> {
let verack = VerAckMsg {
peer_id: self.id.clone(),
ack,
};
trace!("Send VerAckMsg {:?}", verack);
- io_codec.write(NetMsgCmd::Verack, &verack).await?;
+ codec.write(NetMsgCmd::Verack, &verack).await?;
Ok(())
}
/// Wait for a Verack message
///
/// Returns the peer's ID upon successfully receiving the Verack message.
- async fn wait_verack(&self, io_codec: &IOCodec) -> Result<PeerID> {
+ async fn wait_verack(&self, codec: &Codec) -> Result<PeerID> {
let timeout = Duration::from_secs(self.config.handshake_timeout);
- let msg: NetMsg = io_codec.read_timeout(timeout).await?;
+ let msg: NetMsg = codec.read_timeout(timeout).await?;
let payload = get_msg_payload!(Verack, msg);
let (verack, _) = decode::<VerAckMsg>(&payload)?;
diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs
index 770b695..8ddc685 100644
--- a/p2p/src/protocol.rs
+++ b/p2p/src/protocol.rs
@@ -4,7 +4,7 @@ use async_trait::async_trait;
use karyons_core::{event::EventValue, Executor};
-use crate::{peer::ArcPeer, utils::Version, Result};
+use crate::{peer::ArcPeer, version::Version, Result};
pub type ArcProtocol = Arc<dyn Protocol>;
@@ -37,6 +37,7 @@ impl EventValue for ProtocolEvent {
/// use async_trait::async_trait;
/// use smol::Executor;
///
+/// use karyons_core::key_pair::{KeyPair, KeyPairType};
/// use karyons_p2p::{
/// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent},
/// Backend, PeerID, Config, Version, P2pError, ArcPeer};
@@ -84,14 +85,14 @@ impl EventValue for ProtocolEvent {
/// }
///
/// async {
-/// let peer_id = PeerID::random();
+/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
/// let config = Config::default();
///
/// // Create a new Executor
/// let ex = Arc::new(Executor::new());
///
/// // Create a new Backend
-/// let backend = Backend::new(peer_id, config, ex);
+/// let backend = Backend::new(&key_pair, config, ex);
///
/// // Attach the NewProtocol
/// let c = move |peer| NewProtocol::new(peer);
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index dc1b9a1..0a5488d 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -12,9 +12,9 @@ use smol::{
};
use karyons_core::{
- async_utils::{select, timeout, Either, TaskGroup, TaskResult},
+ async_util::{select, timeout, Either, TaskGroup, TaskResult},
event::EventListener,
- utils::decode,
+ util::decode,
Executor,
};
@@ -23,7 +23,7 @@ use karyons_net::NetError;
use crate::{
peer::ArcPeer,
protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID},
- utils::Version,
+ version::Version,
Result,
};
diff --git a/p2p/src/routing_table/entry.rs b/p2p/src/routing_table/entry.rs
index b3f219f..c5fa65d 100644
--- a/p2p/src/routing_table/entry.rs
+++ b/p2p/src/routing_table/entry.rs
@@ -20,7 +20,7 @@ pub struct Entry {
impl PartialEq for Entry {
fn eq(&self, other: &Self) -> bool {
- // XXX this should also compare both addresses (the self.addr == other.addr)
+ // TODO: this should also compare both addresses (the self.addr == other.addr)
self.key == other.key
}
}
diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs
index 5277c0a..cfc3128 100644
--- a/p2p/src/routing_table/mod.rs
+++ b/p2p/src/routing_table/mod.rs
@@ -1,5 +1,8 @@
+use std::net::IpAddr;
+
mod bucket;
mod entry;
+
pub use bucket::{
Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY,
PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
@@ -8,7 +11,7 @@ pub use entry::{xor_distance, Entry, Key};
use rand::{rngs::OsRng, seq::SliceRandom};
-use crate::utils::subnet_match;
+use karyons_net::Addr;
use bucket::BUCKET_SIZE;
use entry::KEY_SIZE;
@@ -262,6 +265,20 @@ impl RoutingTable {
}
}
+/// Check if two addresses belong to the same subnet.
+pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool {
+ match (addr, other_addr) {
+ (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => {
+ // TODO: Consider moving this to a different place
+ if other_ip.is_loopback() && ip.is_loopback() {
+ return false;
+ }
+ ip.octets()[0..3] == other_ip.octets()[0..3]
+ }
+ _ => false,
+ }
+}
+
#[cfg(test)]
mod tests {
use super::bucket::ALL_ENTRY;
diff --git a/p2p/src/slots.rs b/p2p/src/slots.rs
index 99f0a78..d3a1d0a 100644
--- a/p2p/src/slots.rs
+++ b/p2p/src/slots.rs
@@ -1,6 +1,6 @@
use std::sync::atomic::{AtomicUsize, Ordering};
-use karyons_core::async_utils::CondWait;
+use karyons_core::async_util::CondWait;
/// Manages available inbound and outbound slots.
pub struct ConnectionSlots {
diff --git a/p2p/src/tls_config.rs b/p2p/src/tls_config.rs
new file mode 100644
index 0000000..f3b231a
--- /dev/null
+++ b/p2p/src/tls_config.rs
@@ -0,0 +1,214 @@
+use std::sync::Arc;
+
+use async_rustls::rustls::{
+ self, cipher_suite::TLS13_CHACHA20_POLY1305_SHA256, client::ServerCertVerifier,
+ server::ClientCertVerifier, Certificate, CertificateError, Error::InvalidCertificate,
+ PrivateKey, SupportedCipherSuite, SupportedKxGroup, SupportedProtocolVersion,
+};
+use log::error;
+use x509_parser::{certificate::X509Certificate, parse_x509_certificate};
+
+use karyons_core::key_pair::{KeyPair, KeyPairType, PublicKey};
+
+use crate::{PeerID, Result};
+
+// NOTE: This code needs a comprehensive audit.
+
+static PROTOCOL_VERSIONS: &[&SupportedProtocolVersion] = &[&rustls::version::TLS13];
+static CIPHER_SUITES: &[SupportedCipherSuite] = &[TLS13_CHACHA20_POLY1305_SHA256];
+static KX_GROUPS: &[&SupportedKxGroup] = &[&rustls::kx_group::X25519];
+
+const BAD_SIGNATURE_ERR: rustls::Error = InvalidCertificate(CertificateError::BadSignature);
+const BAD_ENCODING_ERR: rustls::Error = InvalidCertificate(CertificateError::BadEncoding);
+
+/// Returns a TLS client configuration.
+pub fn tls_client_config(
+ key_pair: &KeyPair,
+ peer_id: Option<PeerID>,
+) -> Result<rustls::ClientConfig> {
+ let (cert, private_key) = generate_cert(key_pair)?;
+ let server_verifier = SrvrCertVerifier { peer_id };
+ let client_config = rustls::ClientConfig::builder()
+ .with_cipher_suites(CIPHER_SUITES)
+ .with_kx_groups(KX_GROUPS)
+ .with_protocol_versions(PROTOCOL_VERSIONS)?
+ .with_custom_certificate_verifier(Arc::new(server_verifier))
+ .with_client_auth_cert(vec![cert], private_key)?;
+
+ Ok(client_config)
+}
+
+/// Returns a TLS server configuration.
+pub fn tls_server_config(key_pair: &KeyPair) -> Result<rustls::ServerConfig> {
+ let (cert, private_key) = generate_cert(key_pair)?;
+ let client_verifier = CliCertVerifier {};
+ let server_config = rustls::ServerConfig::builder()
+ .with_cipher_suites(CIPHER_SUITES)
+ .with_kx_groups(KX_GROUPS)
+ .with_protocol_versions(PROTOCOL_VERSIONS)?
+ .with_client_cert_verifier(Arc::new(client_verifier))
+ .with_single_cert(vec![cert], private_key)?;
+
+ Ok(server_config)
+}
+
+/// Generates a certificate and returns both the certificate and the private key.
+fn generate_cert(key_pair: &KeyPair) -> Result<(Certificate, PrivateKey)> {
+ let cert_key_pair = rcgen::KeyPair::generate(&rcgen::PKCS_ED25519)?;
+ let private_key = rustls::PrivateKey(cert_key_pair.serialize_der());
+
+ // Add a custom extension to the certificate:
+ // - Sign the certificate's public key with the provided key pair's public key
+ // - Append both the signature and the key pair's public key to the extension
+ let signature = key_pair.sign(&cert_key_pair.public_key_der());
+ let ext_content = yasna::encode_der(&(key_pair.public().as_bytes().to_vec(), signature));
+ // XXX: Not sure about the oid number ???
+ let mut ext = rcgen::CustomExtension::from_oid_content(&[0, 0, 0, 0], ext_content);
+ ext.set_criticality(true);
+
+ let mut params = rcgen::CertificateParams::new(vec![]);
+ params.alg = &rcgen::PKCS_ED25519;
+ params.key_pair = Some(cert_key_pair);
+ params.custom_extensions.push(ext);
+
+ let cert = rustls::Certificate(rcgen::Certificate::from_params(params)?.serialize_der()?);
+ Ok((cert, private_key))
+}
+
+/// Verifies the given certification.
+fn verify_cert(end_entity: &Certificate) -> std::result::Result<PeerID, rustls::Error> {
+ // Parse the certificate.
+ let cert = parse_cert(end_entity)?;
+
+ match cert.extensions().first() {
+ Some(ext) => {
+ // Extract the peer id (public key) and the signature from the extension.
+ let (public_key, signature): (Vec<u8>, Vec<u8>) =
+ yasna::decode_der(ext.value).map_err(|_| BAD_ENCODING_ERR)?;
+
+ // Use the peer id (public key) to verify the extracted signature.
+ let public_key = PublicKey::from_bytes(&KeyPairType::Ed25519, &public_key)
+ .map_err(|_| BAD_ENCODING_ERR)?;
+ public_key
+ .verify(cert.public_key().raw, &signature)
+ .map_err(|_| BAD_SIGNATURE_ERR)?;
+
+ // Verify the certificate signature.
+ verify_cert_signature(
+ &cert,
+ cert.tbs_certificate.as_ref(),
+ cert.signature_value.as_ref(),
+ )?;
+
+ PeerID::try_from(public_key).map_err(|_| BAD_ENCODING_ERR)
+ }
+ None => Err(BAD_ENCODING_ERR),
+ }
+}
+
+/// Parses the given x509 certificate.
+fn parse_cert(end_entity: &Certificate) -> std::result::Result<X509Certificate, rustls::Error> {
+ let (_, cert) = parse_x509_certificate(end_entity.as_ref()).map_err(|_| BAD_ENCODING_ERR)?;
+
+ if !cert.validity().is_valid() {
+ return Err(InvalidCertificate(CertificateError::NotValidYet));
+ }
+
+ Ok(cert)
+}
+
+/// Verifies the signature of the given certificate.
+fn verify_cert_signature(
+ cert: &X509Certificate,
+ message: &[u8],
+ signature: &[u8],
+) -> std::result::Result<(), rustls::Error> {
+ let public_key = PublicKey::from_bytes(
+ &KeyPairType::Ed25519,
+ cert.tbs_certificate.subject_pki.subject_public_key.as_ref(),
+ )
+ .map_err(|_| BAD_ENCODING_ERR)?;
+
+ public_key
+ .verify(message, signature)
+ .map_err(|_| BAD_SIGNATURE_ERR)
+}
+
+struct SrvrCertVerifier {
+ peer_id: Option<PeerID>,
+}
+
+impl ServerCertVerifier for SrvrCertVerifier {
+ fn verify_server_cert(
+ &self,
+ end_entity: &Certificate,
+ _intermediates: &[Certificate],
+ _server_name: &rustls::ServerName,
+ _scts: &mut dyn Iterator<Item = &[u8]>,
+ _ocsp_response: &[u8],
+ _now: std::time::SystemTime,
+ ) -> std::result::Result<rustls::client::ServerCertVerified, rustls::Error> {
+ let peer_id = match verify_cert(end_entity) {
+ Ok(pid) => pid,
+ Err(err) => {
+ error!("Failed to verify cert: {err}");
+ return Err(err);
+ }
+ };
+
+ // Verify that the peer id in the certificate's extension matches the
+ // one the client intends to connect to.
+ // Both should be equal for establishing a fully secure connection.
+ if let Some(pid) = &self.peer_id {
+ if pid != &peer_id {
+ return Err(InvalidCertificate(
+ CertificateError::ApplicationVerificationFailure,
+ ));
+ }
+ }
+
+ Ok(rustls::client::ServerCertVerified::assertion())
+ }
+
+ fn verify_tls13_signature(
+ &self,
+ message: &[u8],
+ cert: &Certificate,
+ dss: &rustls::DigitallySignedStruct,
+ ) -> std::result::Result<rustls::client::HandshakeSignatureValid, rustls::Error> {
+ let cert = parse_cert(cert)?;
+ verify_cert_signature(&cert, message, dss.signature())?;
+ Ok(rustls::client::HandshakeSignatureValid::assertion())
+ }
+}
+
+struct CliCertVerifier {}
+impl ClientCertVerifier for CliCertVerifier {
+ fn verify_client_cert(
+ &self,
+ end_entity: &Certificate,
+ _intermediates: &[Certificate],
+ _now: std::time::SystemTime,
+ ) -> std::result::Result<rustls::server::ClientCertVerified, rustls::Error> {
+ if let Err(err) = verify_cert(end_entity) {
+ error!("Failed to verify cert: {err}");
+ return Err(err);
+ };
+ Ok(rustls::server::ClientCertVerified::assertion())
+ }
+
+ fn verify_tls13_signature(
+ &self,
+ message: &[u8],
+ cert: &Certificate,
+ dss: &rustls::DigitallySignedStruct,
+ ) -> std::result::Result<rustls::client::HandshakeSignatureValid, rustls::Error> {
+ let cert = parse_cert(cert)?;
+ verify_cert_signature(&cert, message, dss.signature())?;
+ Ok(rustls::client::HandshakeSignatureValid::assertion())
+ }
+
+ fn client_auth_root_subjects(&self) -> &[rustls::DistinguishedName] {
+ &[]
+ }
+}
diff --git a/p2p/src/utils/mod.rs b/p2p/src/utils/mod.rs
deleted file mode 100644
index e8ff9d0..0000000
--- a/p2p/src/utils/mod.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-mod version;
-
-pub use version::{version_match, Version, VersionInt};
-
-use std::net::IpAddr;
-
-use karyons_net::Addr;
-
-/// Check if two addresses belong to the same subnet.
-pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool {
- match (addr, other_addr) {
- (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => {
- // XXX Consider moving this to a different location
- if other_ip.is_loopback() && ip.is_loopback() {
- return false;
- }
- ip.octets()[0..3] == other_ip.octets()[0..3]
- }
- _ => false,
- }
-}
diff --git a/p2p/src/utils/version.rs b/p2p/src/version.rs
index a101b28..a101b28 100644
--- a/p2p/src/utils/version.rs
+++ b/p2p/src/version.rs