diff options
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | core/Cargo.toml | 6 | ||||
-rw-r--r-- | core/src/async_util/backoff.rs (renamed from core/src/async_utils/backoff.rs) | 2 | ||||
-rw-r--r-- | core/src/async_util/condvar.rs (renamed from core/src/async_utils/condvar.rs) | 4 | ||||
-rw-r--r-- | core/src/async_util/condwait.rs (renamed from core/src/async_utils/condwait.rs) | 2 | ||||
-rw-r--r-- | core/src/async_util/mod.rs (renamed from core/src/async_utils/mod.rs) | 0 | ||||
-rw-r--r-- | core/src/async_util/select.rs (renamed from core/src/async_utils/select.rs) | 2 | ||||
-rw-r--r-- | core/src/async_util/task_group.rs (renamed from core/src/async_utils/task_group.rs) | 2 | ||||
-rw-r--r-- | core/src/async_util/timeout.rs (renamed from core/src/async_utils/timeout.rs) | 2 | ||||
-rw-r--r-- | core/src/error.rs | 6 | ||||
-rw-r--r-- | core/src/event.rs | 2 | ||||
-rw-r--r-- | core/src/key_pair.rs | 189 | ||||
-rw-r--r-- | core/src/lib.rs | 9 | ||||
-rw-r--r-- | core/src/pubsub.rs | 2 | ||||
-rw-r--r-- | core/src/util/decode.rs (renamed from core/src/utils/decode.rs) | 0 | ||||
-rw-r--r-- | core/src/util/encode.rs (renamed from core/src/utils/encode.rs) | 0 | ||||
-rw-r--r-- | core/src/util/mod.rs (renamed from core/src/utils/mod.rs) | 0 | ||||
-rw-r--r-- | core/src/util/path.rs (renamed from core/src/utils/path.rs) | 0 | ||||
-rw-r--r-- | jsonrpc/src/client.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/codec.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/server.rs | 2 | ||||
-rw-r--r-- | net/Cargo.toml | 3 | ||||
-rw-r--r-- | net/src/connection.rs | 9 | ||||
-rw-r--r-- | net/src/endpoint.rs | 70 | ||||
-rw-r--r-- | net/src/error.rs | 8 | ||||
-rw-r--r-- | net/src/lib.rs | 1 | ||||
-rw-r--r-- | net/src/listener.rs | 5 | ||||
-rw-r--r-- | net/src/transports/mod.rs | 1 | ||||
-rw-r--r-- | net/src/transports/tcp.rs | 2 | ||||
-rw-r--r-- | net/src/transports/tls.rs | 140 | ||||
-rw-r--r-- | net/src/transports/udp.rs | 2 | ||||
-rw-r--r-- | net/src/transports/unix.rs | 2 | ||||
-rw-r--r-- | p2p/Cargo.toml | 6 | ||||
-rw-r--r-- | p2p/README.md | 11 | ||||
-rw-r--r-- | p2p/examples/chat.rs | 7 | ||||
-rw-r--r-- | p2p/examples/monitor.rs | 14 | ||||
-rwxr-xr-x | p2p/examples/net_simulation.sh | 24 | ||||
-rw-r--r-- | p2p/examples/peer.rs | 14 | ||||
-rw-r--r-- | p2p/src/backend.rs | 43 | ||||
-rw-r--r-- | p2p/src/codec.rs (renamed from p2p/src/io_codec.rs) | 24 | ||||
-rw-r--r-- | p2p/src/config.rs | 7 | ||||
-rw-r--r-- | p2p/src/connection.rs | 2 | ||||
-rw-r--r-- | p2p/src/connector.rs | 56 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 71 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 30 | ||||
-rw-r--r-- | p2p/src/discovery/refresh.rs | 4 | ||||
-rw-r--r-- | p2p/src/error.rs | 18 | ||||
-rw-r--r-- | p2p/src/lib.rs | 13 | ||||
-rw-r--r-- | p2p/src/listener.rs | 65 | ||||
-rw-r--r-- | p2p/src/message.rs | 2 | ||||
-rw-r--r-- | p2p/src/monitor.rs | 4 | ||||
-rw-r--r-- | p2p/src/peer/mod.rs | 23 | ||||
-rw-r--r-- | p2p/src/peer/peer_id.rs | 17 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 46 | ||||
-rw-r--r-- | p2p/src/protocol.rs | 7 | ||||
-rw-r--r-- | p2p/src/protocols/ping.rs | 6 | ||||
-rw-r--r-- | p2p/src/routing_table/entry.rs | 2 | ||||
-rw-r--r-- | p2p/src/routing_table/mod.rs | 19 | ||||
-rw-r--r-- | p2p/src/slots.rs | 2 | ||||
-rw-r--r-- | p2p/src/tls_config.rs | 214 | ||||
-rw-r--r-- | p2p/src/utils/mod.rs | 21 | ||||
-rw-r--r-- | p2p/src/version.rs (renamed from p2p/src/utils/version.rs) | 0 |
62 files changed, 999 insertions, 256 deletions
@@ -25,9 +25,9 @@ implementation for building collaborative software. ## Status This project is a work in progress. The current focus is on shipping karyons -crdt and karyons store, along with major changes to the network stack, -including TLS implementation. You can check the -[issues](https://github.com/karyons/karyons/issues) for updates on ongoing tasks. +crdt and karyons store, along with major changes to the network stack. You can +check the [issues](https://github.com/karyons/karyons/issues) for updates on +ongoing tasks. ## Docs diff --git a/core/Cargo.toml b/core/Cargo.toml index ab05288..5a99e2d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,9 +10,13 @@ edition.workspace = true smol = "1.3.0" pin-project-lite = "0.2.13" log = "0.4.20" -bincode = { version="2.0.0-rc.3", features = ["derive"]} +bincode = "2.0.0-rc.3" chrono = "0.4.30" rand = "0.8.5" thiserror = "1.0.47" dirs = "5.0.1" async-task = "4.5.0" +ed25519-dalek = { version = "2.1.0", features = ["rand_core"]} + + + diff --git a/core/src/async_utils/backoff.rs b/core/src/async_util/backoff.rs index f7e131d..a231229 100644 --- a/core/src/async_utils/backoff.rs +++ b/core/src/async_util/backoff.rs @@ -12,7 +12,7 @@ use smol::Timer; /// # Examples /// /// ``` -/// use karyons_core::async_utils::Backoff; +/// use karyons_core::async_util::Backoff; /// /// async { /// let backoff = Backoff::new(300, 3000); diff --git a/core/src/async_utils/condvar.rs b/core/src/async_util/condvar.rs index 814f78f..7396d0d 100644 --- a/core/src/async_utils/condvar.rs +++ b/core/src/async_util/condvar.rs @@ -8,7 +8,7 @@ use std::{ use smol::lock::MutexGuard; -use crate::utils::random_16; +use crate::util::random_16; /// CondVar is an async version of <https://doc.rust-lang.org/std/sync/struct.Condvar.html> /// @@ -19,7 +19,7 @@ use crate::utils::random_16; /// /// use smol::lock::Mutex; /// -/// use karyons_core::async_utils::CondVar; +/// use karyons_core::async_util::CondVar; /// /// async { /// diff --git a/core/src/async_utils/condwait.rs b/core/src/async_util/condwait.rs index e31fac3..cd4b269 100644 --- a/core/src/async_utils/condwait.rs +++ b/core/src/async_util/condwait.rs @@ -9,7 +9,7 @@ use super::CondVar; ///``` /// use std::sync::Arc; /// -/// use karyons_core::async_utils::CondWait; +/// use karyons_core::async_util::CondWait; /// /// async { /// let cond_wait = Arc::new(CondWait::new()); diff --git a/core/src/async_utils/mod.rs b/core/src/async_util/mod.rs index c871bad..c871bad 100644 --- a/core/src/async_utils/mod.rs +++ b/core/src/async_util/mod.rs diff --git a/core/src/async_utils/select.rs b/core/src/async_util/select.rs index 9fe3c77..8f2f7f6 100644 --- a/core/src/async_utils/select.rs +++ b/core/src/async_util/select.rs @@ -12,7 +12,7 @@ use smol::future::Future; /// ``` /// use std::future; /// -/// use karyons_core::async_utils::{select, Either}; +/// use karyons_core::async_util::{select, Either}; /// /// async { /// let fut1 = future::pending::<String>(); diff --git a/core/src/async_utils/task_group.rs b/core/src/async_util/task_group.rs index afc9648..3fc0cb7 100644 --- a/core/src/async_utils/task_group.rs +++ b/core/src/async_util/task_group.rs @@ -14,7 +14,7 @@ use super::{select, CondWait, Either}; /// /// use std::sync::Arc; /// -/// use karyons_core::async_utils::TaskGroup; +/// use karyons_core::async_util::TaskGroup; /// /// async { /// diff --git a/core/src/async_utils/timeout.rs b/core/src/async_util/timeout.rs index 7c55e1b..6ab35c4 100644 --- a/core/src/async_utils/timeout.rs +++ b/core/src/async_util/timeout.rs @@ -13,7 +13,7 @@ use crate::{error::Error, Result}; /// ``` /// use std::{future, time::Duration}; /// -/// use karyons_core::async_utils::timeout; +/// use karyons_core::async_util::timeout; /// /// async { /// let fut = future::pending::<()>(); diff --git a/core/src/error.rs b/core/src/error.rs index 63b45d3..7c547c4 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -7,12 +7,18 @@ pub enum Error { #[error(transparent)] IO(#[from] std::io::Error), + #[error("TryInto Error: {0}")] + TryInto(&'static str), + #[error("Timeout Error")] Timeout, #[error("Path Not Found Error: {0}")] PathNotFound(&'static str), + #[error(transparent)] + Ed25519(#[from] ed25519_dalek::ed25519::Error), + #[error("Channel Send Error: {0}")] ChannelSend(String), diff --git a/core/src/event.rs b/core/src/event.rs index 0503e88..f2c5510 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -12,7 +12,7 @@ use smol::{ lock::Mutex, }; -use crate::{utils::random_16, Result}; +use crate::{util::random_16, Result}; pub type ArcEventSys<T> = Arc<EventSys<T>>; pub type WeakEventSys<T> = Weak<EventSys<T>>; diff --git a/core/src/key_pair.rs b/core/src/key_pair.rs new file mode 100644 index 0000000..4016351 --- /dev/null +++ b/core/src/key_pair.rs @@ -0,0 +1,189 @@ +use ed25519_dalek::{Signer as _, Verifier as _}; +use rand::rngs::OsRng; + +use crate::{error::Error, Result}; + +/// key cryptography type +pub enum KeyPairType { + Ed25519, +} + +/// A Public key +pub struct PublicKey(PublicKeyInner); + +/// A Secret key +pub struct SecretKey(Vec<u8>); + +impl PublicKey { + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + /// Verify a signature on a message with this public key. + pub fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> { + self.0.verify(msg, signature) + } +} + +impl PublicKey { + pub fn from_bytes(kp_type: &KeyPairType, pk: &[u8]) -> Result<Self> { + Ok(Self(PublicKeyInner::from_bytes(kp_type, pk)?)) + } +} + +/// A KeyPair. +#[derive(Clone)] +pub struct KeyPair(KeyPairInner); + +impl KeyPair { + /// Generate a new random keypair. + pub fn generate(kp_type: &KeyPairType) -> Self { + Self(KeyPairInner::generate(kp_type)) + } + + /// Sign a message using the private key. + pub fn sign(&self, msg: &[u8]) -> Vec<u8> { + self.0.sign(msg) + } + + /// Get the public key of this keypair. + pub fn public(&self) -> PublicKey { + self.0.public() + } + + /// Get the secret key of this keypair. + pub fn secret(&self) -> SecretKey { + self.0.secret() + } +} + +/// An extension trait, adding essential methods to all [`KeyPair`] types. +trait KeyPairExt { + /// Sign a message using the private key. + fn sign(&self, msg: &[u8]) -> Vec<u8>; + + /// Get the public key of this keypair. + fn public(&self) -> PublicKey; + + /// Get the secret key of this keypair. + fn secret(&self) -> SecretKey; +} + +#[derive(Clone)] +enum KeyPairInner { + Ed25519(Ed25519KeyPair), +} + +impl KeyPairInner { + fn generate(kp_type: &KeyPairType) -> Self { + match kp_type { + KeyPairType::Ed25519 => Self::Ed25519(Ed25519KeyPair::generate()), + } + } +} + +impl KeyPairExt for KeyPairInner { + fn sign(&self, msg: &[u8]) -> Vec<u8> { + match self { + KeyPairInner::Ed25519(kp) => kp.sign(msg), + } + } + + fn public(&self) -> PublicKey { + match self { + KeyPairInner::Ed25519(kp) => kp.public(), + } + } + + fn secret(&self) -> SecretKey { + match self { + KeyPairInner::Ed25519(kp) => kp.secret(), + } + } +} + +#[derive(Clone)] +struct Ed25519KeyPair(ed25519_dalek::SigningKey); + +impl Ed25519KeyPair { + fn generate() -> Self { + Self(ed25519_dalek::SigningKey::generate(&mut OsRng)) + } +} + +impl KeyPairExt for Ed25519KeyPair { + fn sign(&self, msg: &[u8]) -> Vec<u8> { + self.0.sign(msg).to_bytes().to_vec() + } + + fn public(&self) -> PublicKey { + PublicKey(PublicKeyInner::Ed25519(Ed25519PublicKey( + self.0.verifying_key(), + ))) + } + + fn secret(&self) -> SecretKey { + SecretKey(self.0.to_bytes().to_vec()) + } +} + +/// An extension trait, adding essential methods to all [`PublicKey`] types. +trait PublicKeyExt { + fn as_bytes(&self) -> &[u8]; + + /// Verify a signature on a message with this public key. + fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()>; +} + +enum PublicKeyInner { + Ed25519(Ed25519PublicKey), +} + +impl PublicKeyInner { + pub fn from_bytes(kp_type: &KeyPairType, pk: &[u8]) -> Result<Self> { + match kp_type { + KeyPairType::Ed25519 => Ok(Self::Ed25519(Ed25519PublicKey::from_bytes(pk)?)), + } + } +} + +impl PublicKeyExt for PublicKeyInner { + fn as_bytes(&self) -> &[u8] { + match self { + Self::Ed25519(pk) => pk.as_bytes(), + } + } + + fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> { + match self { + Self::Ed25519(pk) => pk.verify(msg, signature), + } + } +} + +struct Ed25519PublicKey(ed25519_dalek::VerifyingKey); + +impl Ed25519PublicKey { + pub fn from_bytes(pk: &[u8]) -> Result<Self> { + let pk_bytes: [u8; 32] = pk + .try_into() + .map_err(|_| Error::TryInto("Failed to convert slice to [u8; 32]"))?; + + Ok(Self(ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)?)) + } +} + +impl PublicKeyExt for Ed25519PublicKey { + fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + fn verify(&self, msg: &[u8], signature: &[u8]) -> Result<()> { + let sig_bytes: [u8; 64] = signature + .try_into() + .map_err(|_| Error::TryInto("Failed to convert slice to [u8; 64]"))?; + self.0 + .verify(msg, &ed25519_dalek::Signature::from_bytes(&sig_bytes))?; + Ok(()) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 67e6610..276ed89 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,19 +1,22 @@ /// A set of helper tools and functions. -pub mod utils; +pub mod util; /// A module containing async utilities that work with the /// [`smol`](https://github.com/smol-rs/smol) async runtime. -pub mod async_utils; +pub mod async_util; /// Represents karyons's Core Error. pub mod error; -/// [`event::EventSys`] Implementation +/// [`event::EventSys`] implementation. pub mod event; /// A simple publish-subscribe system [`Read More`](./pubsub/struct.Publisher.html) pub mod pubsub; +/// A cryptographic key pair +pub mod key_pair; + use smol::Executor as SmolEx; use std::sync::Arc; diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs index 4cc0ab7..306d42f 100644 --- a/core/src/pubsub.rs +++ b/core/src/pubsub.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use log::error; use smol::lock::Mutex; -use crate::{utils::random_16, Result}; +use crate::{util::random_16, Result}; pub type ArcPublisher<T> = Arc<Publisher<T>>; pub type SubscriptionID = u16; diff --git a/core/src/utils/decode.rs b/core/src/util/decode.rs index a8a6522..a8a6522 100644 --- a/core/src/utils/decode.rs +++ b/core/src/util/decode.rs diff --git a/core/src/utils/encode.rs b/core/src/util/encode.rs index 7d1061b..7d1061b 100644 --- a/core/src/utils/encode.rs +++ b/core/src/util/encode.rs diff --git a/core/src/utils/mod.rs b/core/src/util/mod.rs index a3c3f50..a3c3f50 100644 --- a/core/src/utils/mod.rs +++ b/core/src/util/mod.rs diff --git a/core/src/utils/path.rs b/core/src/util/path.rs index 2cd900a..2cd900a 100644 --- a/core/src/utils/path.rs +++ b/core/src/util/path.rs diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index f5277aa..939d177 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -1,7 +1,7 @@ use log::debug; use serde::{de::DeserializeOwned, Serialize}; -use karyons_core::utils::random_32; +use karyons_core::util::random_32; use karyons_net::{dial, Conn, Endpoint}; use crate::{ diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index e198a6e..5dac8da 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,6 +1,6 @@ use memchr::memchr; -use karyons_core::async_utils::timeout; +use karyons_core::async_util::timeout; use karyons_net::Conn; use crate::{Error, Result}; diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 5b9b799..05ef7da 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -4,7 +4,7 @@ use log::{debug, error, warn}; use smol::lock::RwLock; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, + async_util::{TaskGroup, TaskResult}, Executor, }; use karyons_net::{listen, Conn, Endpoint, Listener}; diff --git a/net/Cargo.toml b/net/Cargo.toml index de9b33b..863a250 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -11,6 +11,7 @@ karyons_core.workspace = true smol = "1.3.0" async-trait = "0.1.73" log = "0.4.20" -bincode = { version="2.0.0-rc.3", features = ["derive"]} +bincode = "2.0.0-rc.3" thiserror = "1.0.47" url = "2.4.1" +async-rustls = { version = "0.4.1", features = ["dangerous_configuration"] } diff --git a/net/src/connection.rs b/net/src/connection.rs index d8ec0a3..b1d7550 100644 --- a/net/src/connection.rs +++ b/net/src/connection.rs @@ -1,7 +1,9 @@ -use crate::{Endpoint, Result}; use async_trait::async_trait; -use crate::transports::{tcp, udp, unix}; +use crate::{ + transports::{tcp, udp, unix}, + Endpoint, Error, Result, +}; /// Alias for `Box<dyn Connection>` pub type Conn = Box<dyn Connection>; @@ -28,7 +30,7 @@ pub trait Connection: Send + Sync { /// Connects to the provided endpoint. /// -/// it only supports `tcp4/6`, `udp4/6` and `unix`. +/// it only supports `tcp4/6`, `udp4/6`, and `unix`. /// /// #Example /// @@ -53,5 +55,6 @@ pub async fn dial(endpoint: &Endpoint) -> Result<Conn> { Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::dial_tcp(addr, port).await?)), Endpoint::Udp(addr, port) => Ok(Box::new(udp::dial_udp(addr, port).await?)), Endpoint::Unix(addr) => Ok(Box::new(unix::dial_unix(addr).await?)), + _ => Err(Error::InvalidEndpoint(endpoint.to_string())), } } diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs index 50dfe6b..720eea3 100644 --- a/net/src/endpoint.rs +++ b/net/src/endpoint.rs @@ -5,7 +5,7 @@ use std::{ str::FromStr, }; -use bincode::{Decode, Encode}; +use bincode::{impl_borrow_decode, Decode, Encode}; use url::Url; use crate::{Error, Result}; @@ -33,6 +33,7 @@ pub type Port = u16; pub enum Endpoint { Udp(Addr, Port), Tcp(Addr, Port), + Tls(Addr, Port), Unix(String), } @@ -45,6 +46,9 @@ impl std::fmt::Display for Endpoint { Endpoint::Tcp(ip, port) => { write!(f, "tcp://{}:{}", ip, port) } + Endpoint::Tls(ip, port) => { + write!(f, "tls://{}:{}", ip, port) + } Endpoint::Unix(path) => { if path.is_empty() { write!(f, "unix:/UNNAMED") @@ -60,9 +64,10 @@ impl TryFrom<Endpoint> for SocketAddr { type Error = Error; fn try_from(endpoint: Endpoint) -> std::result::Result<SocketAddr, Self::Error> { match endpoint { - Endpoint::Udp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), - Endpoint::Tcp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), - Endpoint::Unix(_) => Err(Error::TryFromEndpointError), + Endpoint::Udp(ip, port) | Endpoint::Tcp(ip, port) | Endpoint::Tls(ip, port) => { + Ok(SocketAddr::new(ip.try_into()?, port)) + } + Endpoint::Unix(_) => Err(Error::TryFromEndpoint), } } } @@ -72,7 +77,7 @@ impl TryFrom<Endpoint> for PathBuf { fn try_from(endpoint: Endpoint) -> std::result::Result<PathBuf, Self::Error> { match endpoint { Endpoint::Unix(path) => Ok(PathBuf::from(&path)), - _ => Err(Error::TryFromEndpointError), + _ => Err(Error::TryFromEndpoint), } } } @@ -82,7 +87,7 @@ impl TryFrom<Endpoint> for UnixSocketAddress { fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> { match endpoint { Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?), - _ => Err(Error::TryFromEndpointError), + _ => Err(Error::TryFromEndpoint), } } } @@ -112,6 +117,7 @@ impl FromStr for Endpoint { match url.scheme() { "tcp" => Ok(Endpoint::Tcp(addr, port)), "udp" => Ok(Endpoint::Udp(addr, port)), + "tls" => Ok(Endpoint::Tls(addr, port)), _ => Err(Error::InvalidEndpoint(s.to_string())), } } else { @@ -133,6 +139,11 @@ impl Endpoint { Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) } + /// Creates a new TLS endpoint from a `SocketAddr`. + pub fn new_tls_addr(addr: &SocketAddr) -> Endpoint { + Endpoint::Tls(Addr::Ip(addr.ip()), addr.port()) + } + /// Creates a new UDP endpoint from a `SocketAddr`. pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint { Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) @@ -151,29 +162,62 @@ impl Endpoint { /// Returns the `Port` of the endpoint. pub fn port(&self) -> Result<&Port> { match self { - Endpoint::Tcp(_, port) => Ok(port), - Endpoint::Udp(_, port) => Ok(port), - _ => Err(Error::TryFromEndpointError), + Endpoint::Udp(_, port) | Endpoint::Tcp(_, port) | Endpoint::Tls(_, port) => Ok(port), + _ => Err(Error::TryFromEndpoint), } } /// Returns the `Addr` of the endpoint. pub fn addr(&self) -> Result<&Addr> { match self { - Endpoint::Tcp(addr, _) => Ok(addr), - Endpoint::Udp(addr, _) => Ok(addr), - _ => Err(Error::TryFromEndpointError), + Endpoint::Udp(addr, _) | Endpoint::Tcp(addr, _) | Endpoint::Tls(addr, _) => Ok(addr), + _ => Err(Error::TryFromEndpoint), } } } /// Addr defines a type for an address, either IP or domain. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Addr { Ip(IpAddr), Domain(String), } +impl Encode for Addr { + fn encode<E: bincode::enc::Encoder>( + &self, + encoder: &mut E, + ) -> std::result::Result<(), bincode::error::EncodeError> { + match self { + Addr::Ip(addr) => { + 0u32.encode(encoder)?; + addr.encode(encoder) + } + Addr::Domain(domain) => { + 1u32.encode(encoder)?; + domain.encode(encoder) + } + } + } +} + +impl Decode for Addr { + fn decode<D: bincode::de::Decoder>( + decoder: &mut D, + ) -> std::result::Result<Self, bincode::error::DecodeError> { + match u32::decode(decoder)? { + 0 => Ok(Addr::Ip(IpAddr::decode(decoder)?)), + 1 => Ok(Addr::Domain(String::decode(decoder)?)), + found => Err(bincode::error::DecodeError::UnexpectedVariant { + allowed: &bincode::error::AllowedEnumVariants::Range { min: 0, max: 1 }, + found, + type_name: core::any::type_name::<Addr>(), + }), + } + } +} +impl_borrow_decode!(Addr); + impl TryFrom<Addr> for IpAddr { type Error = Error; fn try_from(addr: Addr) -> std::result::Result<IpAddr, Self::Error> { diff --git a/net/src/error.rs b/net/src/error.rs index 346184a..5dd6348 100644 --- a/net/src/error.rs +++ b/net/src/error.rs @@ -8,7 +8,7 @@ pub enum Error { IO(#[from] std::io::Error), #[error("Try from endpoint Error")] - TryFromEndpointError, + TryFromEndpoint, #[error("invalid address {0}")] InvalidAddress(String), @@ -28,6 +28,12 @@ pub enum Error { #[error(transparent)] ChannelRecv(#[from] smol::channel::RecvError), + #[error("Tls Error: {0}")] + Rustls(#[from] async_rustls::rustls::Error), + + #[error("Invalid DNS Name: {0}")] + InvalidDnsNameError(#[from] async_rustls::rustls::client::InvalidDnsNameError), + #[error(transparent)] KaryonsCore(#[from] karyons_core::error::Error), } diff --git a/net/src/lib.rs b/net/src/lib.rs index 0e4c361..61069ef 100644 --- a/net/src/lib.rs +++ b/net/src/lib.rs @@ -10,6 +10,7 @@ pub use { listener::{listen, Listener}, transports::{ tcp::{dial_tcp, listen_tcp, TcpConn}, + tls, udp::{dial_udp, listen_udp, UdpConn}, unix::{dial_unix, listen_unix, UnixConn}, }, diff --git a/net/src/listener.rs b/net/src/listener.rs index 31a63ae..c6c3d94 100644 --- a/net/src/listener.rs +++ b/net/src/listener.rs @@ -1,9 +1,8 @@ -use crate::{Endpoint, Error, Result}; use async_trait::async_trait; use crate::{ transports::{tcp, unix}, - Conn, + Conn, Endpoint, Error, Result, }; /// Listener is a generic network listener. @@ -15,7 +14,7 @@ pub trait Listener: Send + Sync { /// Listens to the provided endpoint. /// -/// it only supports `tcp4/6` and `unix`. +/// it only supports `tcp4/6`, and `unix`. /// /// #Example /// diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs index f399133..ac23021 100644 --- a/net/src/transports/mod.rs +++ b/net/src/transports/mod.rs @@ -1,3 +1,4 @@ pub mod tcp; +pub mod tls; pub mod udp; pub mod unix; diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs index 84aa980..37f00a7 100644 --- a/net/src/transports/tcp.rs +++ b/net/src/transports/tcp.rs @@ -13,7 +13,7 @@ use crate::{ Error, Result, }; -/// TCP network connection implementations of the [`Connection`] trait. +/// TCP network connection implementation of the [`Connection`] trait. pub struct TcpConn { inner: TcpStream, read: Mutex<ReadHalf<TcpStream>>, diff --git a/net/src/transports/tls.rs b/net/src/transports/tls.rs new file mode 100644 index 0000000..01bb5aa --- /dev/null +++ b/net/src/transports/tls.rs @@ -0,0 +1,140 @@ +use std::sync::Arc; + +use async_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream}; +use async_trait::async_trait; +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::{TcpListener, TcpStream}, +}; + +use crate::{ + connection::Connection, + endpoint::{Addr, Endpoint, Port}, + listener::Listener, + Error, Result, +}; + +/// TLS network connection implementation of the [`Connection`] trait. +pub struct TlsConn { + inner: TcpStream, + read: Mutex<ReadHalf<TlsStream<TcpStream>>>, + write: Mutex<WriteHalf<TlsStream<TcpStream>>>, +} + +impl TlsConn { + /// Creates a new TlsConn + pub fn new(sock: TcpStream, conn: TlsStream<TcpStream>) -> Self { + let (read, write) = split(conn); + Self { + inner: sock, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for TlsConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tls_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tls_addr(&self.inner.local_addr()?)) + } + + async fn read(&self, buf: &mut [u8]) -> Result<usize> { + self.read.lock().await.read(buf).await.map_err(Error::from) + } + + async fn write(&self, buf: &[u8]) -> Result<usize> { + self.write + .lock() + .await + .write(buf) + .await + .map_err(Error::from) + } +} + +/// Connects to the given TLS address and port. +pub async fn dial_tls( + addr: &Addr, + port: &Port, + config: rustls::ClientConfig, + dns_name: &str, +) -> Result<TlsConn> { + let address = format!("{}:{}", addr, port); + + let connector = TlsConnector::from(Arc::new(config)); + + let sock = TcpStream::connect(&address).await?; + sock.set_nodelay(true)?; + + let altname = rustls::ServerName::try_from(dns_name)?; + let conn = connector.connect(altname, sock.clone()).await?; + Ok(TlsConn::new(sock, TlsStream::Client(conn))) +} + +/// Connects to the given TLS endpoint, returns `Conn` ([`Connection`]). +pub async fn dial( + endpoint: &Endpoint, + config: rustls::ClientConfig, + dns_name: &str, +) -> Result<Box<dyn Connection>> { + match endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => {} + _ => return Err(Error::InvalidEndpoint(endpoint.to_string())), + } + + dial_tls(endpoint.addr()?, endpoint.port()?, config, dns_name) + .await + .map(|c| Box::new(c) as Box<dyn Connection>) +} +/// Tls network listener implementation of the [`Listener`] trait. +pub struct TlsListener { + acceptor: TlsAcceptor, + listener: TcpListener, +} + +#[async_trait] +impl Listener for TlsListener { + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tls_addr(&self.listener.local_addr()?)) + } + + async fn accept(&self) -> Result<Box<dyn Connection>> { + let (sock, _) = self.listener.accept().await?; + sock.set_nodelay(true)?; + let conn = self.acceptor.accept(sock.clone()).await?; + Ok(Box::new(TlsConn::new(sock, TlsStream::Server(conn)))) + } +} + +/// Listens on the given TLS address and port. +pub async fn listen_tls( + addr: &Addr, + port: &Port, + config: rustls::ServerConfig, +) -> Result<TlsListener> { + let address = format!("{}:{}", addr, port); + let acceptor = TlsAcceptor::from(Arc::new(config)); + let listener = TcpListener::bind(&address).await?; + Ok(TlsListener { acceptor, listener }) +} + +/// Listens on the given TLS endpoint, returns [`Listener`]. +pub async fn listen( + endpoint: &Endpoint, + config: rustls::ServerConfig, +) -> Result<Box<dyn Listener>> { + match endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => {} + _ => return Err(Error::InvalidEndpoint(endpoint.to_string())), + } + + listen_tls(endpoint.addr()?, endpoint.port()?, config) + .await + .map(|l| Box::new(l) as Box<dyn Listener>) +} diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs index ca5b94d..8a2fbec 100644 --- a/net/src/transports/udp.rs +++ b/net/src/transports/udp.rs @@ -9,7 +9,7 @@ use crate::{ Error, Result, }; -/// UDP network connection implementations of the [`Connection`] trait. +/// UDP network connection implementation of the [`Connection`] trait. pub struct UdpConn { inner: UdpSocket, } diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index a720d91..e504934 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -8,7 +8,7 @@ use smol::{ use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Error, Result}; -/// Unix domain socket implementations of the [`Connection`] trait. +/// Unix domain socket implementation of the [`Connection`] trait. pub struct UnixConn { inner: UnixStream, read: Mutex<ReadHalf<UnixStream>>, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 98b700e..315983b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -20,6 +20,12 @@ thiserror = "1.0.47" semver = "1.0.20" sha2 = "0.10.8" +# tls +async-rustls = { version = "0.4.1", features = ["dangerous_configuration"] } +rcgen = "0.11.3" +yasna = "0.5.2" +x509-parser = "0.15.1" + [[example]] name = "peer" path = "examples/peer.rs" diff --git a/p2p/README.md b/p2p/README.md index 5bdaf63..edc5fcd 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -115,11 +115,12 @@ impl Protocol for NewProtocol { Whenever a new peer is added to the PeerPool, all the protocols, including your custom protocols, will automatically start running with the newly connected peer. -## Network Security +## Network Security -It's obvious that connections in karyons p2p are not secure at the moment, as -it currently only supports TCP connections. However, we are currently working -on adding support for TLS connections. +Using TLS is possible for all inbound and outbound connections by enabling the +boolean `enable_tls` field in the configuration. However, implementing TLS for +a P2P network is not trivial and is still unstable, requiring a comprehensive +audit. ## Usage @@ -129,5 +130,5 @@ If you have tmux installed, you can run the network simulation script in the examples directory to run 12 peers simultaneously. ```bash -$ RUST_LOG=karyons=debug ./net_simulation.sh +$ RUST_LOG=karyons=info ./net_simulation.sh ``` diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 907ba06..d94bca4 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -7,11 +7,12 @@ use async_trait::async_trait; use clap::Parser; use smol::{channel, Executor}; +use karyons_core::key_pair::{KeyPair, KeyPairType}; use karyons_net::{Endpoint, Port}; use karyons_p2p::{ protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID}, - ArcPeer, Backend, Config, P2pError, PeerID, Version, + ArcPeer, Backend, Config, P2pError, Version, }; use shared::run_executor; @@ -102,7 +103,7 @@ fn main() { let cli = Cli::parse(); // Create a PeerID based on the username. - let peer_id = PeerID::new(cli.username.as_bytes()); + let key_pair = KeyPair::generate(&KeyPairType::Ed25519); // Create the configuration for the backend. let config = Config { @@ -117,7 +118,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(peer_id, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index fc48c2f..530d2d5 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use clap::Parser; use smol::{channel, Executor}; +use karyons_core::key_pair::{KeyPair, KeyPairType}; use karyons_net::{Endpoint, Port}; -use karyons_p2p::{Backend, Config, PeerID}; +use karyons_p2p::{Backend, Config}; use shared::run_executor; @@ -29,20 +30,13 @@ struct Cli { /// Optional TCP/UDP port for the discovery service. #[arg(short)] discovery_port: Option<Port>, - - /// Optional user id - #[arg(long)] - userid: Option<String>, } fn main() { env_logger::init(); let cli = Cli::parse(); - let peer_id = match cli.userid { - Some(userid) => PeerID::new(userid.as_bytes()), - None => PeerID::random(), - }; + let key_pair = KeyPair::generate(&KeyPairType::Ed25519); // Create the configuration for the backend. let config = Config { @@ -57,7 +51,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(peer_id, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); diff --git a/p2p/examples/net_simulation.sh b/p2p/examples/net_simulation.sh index 1a05adf..dd489e5 100755 --- a/p2p/examples/net_simulation.sh +++ b/p2p/examples/net_simulation.sh @@ -5,27 +5,27 @@ cargo build --release --example peer tmux new-session -d -s karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer1'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30000' -d '30010'" Enter tmux split-window -h -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer2'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30001' -d '30011' -b 'tcp://127.0.0.1:30010 ' " Enter tmux split-window -h -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer3'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30002' -d '30012' -b 'tcp://127.0.0.1:30010'" Enter tmux split-window -h -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer4'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30003' -d '30013' -b 'tcp://127.0.0.1:30010'" Enter tmux split-window -h -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer5'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30004' -d '30014' -b 'tcp://127.0.0.1:30010'" Enter tmux split-window -h -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer6'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -l 'tcp://127.0.0.1:30005' -d '30015' -b 'tcp://127.0.0.1:30010'" Enter tmux select-layout even-horizontal @@ -35,37 +35,37 @@ sleep 3; tmux select-pane -t karyons_p2p:0.0 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer7'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30011'" Enter tmux select-pane -t karyons_p2p:0.2 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer8'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30012' -p 'tcp://127.0.0.1:30005'" Enter tmux select-pane -t karyons_p2p:0.4 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer9'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30013'" Enter tmux select-pane -t karyons_p2p:0.6 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer10'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30014'" Enter tmux select-pane -t karyons_p2p:0.8 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer11'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30015'" Enter tmux select-pane -t karyons_p2p:0.10 tmux split-window -v -t karyons_p2p -tmux send-keys -t karyons_p2p "../../target/release/examples/peer --userid 'peer12'\ +tmux send-keys -t karyons_p2p "../../target/release/examples/peer\ -b 'tcp://127.0.0.1:30010' -b 'tcp://127.0.0.1:30015' -b 'tcp://127.0.0.1:30011'" Enter # tmux set-window-option -t karyons_p2p synchronize-panes on diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index 5ff365d..b595b4a 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use clap::Parser; use smol::{channel, Executor}; +use karyons_core::key_pair::{KeyPair, KeyPairType}; use karyons_net::{Endpoint, Port}; -use karyons_p2p::{Backend, Config, PeerID}; +use karyons_p2p::{Backend, Config}; use shared::run_executor; @@ -29,20 +30,13 @@ struct Cli { /// Optional TCP/UDP port for the discovery service. #[arg(short)] discovery_port: Option<Port>, - - /// Optional user id - #[arg(long)] - userid: Option<String>, } fn main() { env_logger::init(); let cli = Cli::parse(); - let peer_id = match cli.userid { - Some(userid) => PeerID::new(userid.as_bytes()), - None => PeerID::random(), - }; + let key_pair = KeyPair::generate(&KeyPairType::Ed25519); // Create the configuration for the backend. let config = Config { @@ -57,7 +51,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(peer_id, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 2e34f47..56d79f7 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use log::info; -use karyons_core::{pubsub::Subscription, GlobalExecutor}; +use karyons_core::{key_pair::KeyPair, pubsub::Subscription, GlobalExecutor}; use crate::{ config::Config, @@ -22,8 +22,8 @@ pub struct Backend { /// The Configuration for the P2P network. config: Arc<Config>, - /// Peer ID. - id: PeerID, + /// Identity Key pair + key_pair: KeyPair, /// Responsible for network and system monitoring. monitor: Arc<Monitor>, @@ -37,17 +37,34 @@ pub struct Backend { impl Backend { /// Creates a new Backend. - pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend { + pub fn new(key_pair: &KeyPair, config: Config, ex: GlobalExecutor) -> ArcBackend { let config = Arc::new(config); let monitor = Arc::new(Monitor::new()); - let cq = ConnQueue::new(); - - let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone()); - - let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex); + let conn_queue = ConnQueue::new(); + + let peer_id = PeerID::try_from(key_pair.public()) + .expect("Derive a peer id from the provided key pair."); + info!("PeerID: {}", peer_id); + + let peer_pool = PeerPool::new( + &peer_id, + conn_queue.clone(), + config.clone(), + monitor.clone(), + ex.clone(), + ); + + let discovery = Discovery::new( + key_pair, + &peer_id, + conn_queue, + config.clone(), + monitor.clone(), + ex, + ); Arc::new(Self { - id: id.clone(), + key_pair: key_pair.clone(), monitor, discovery, config, @@ -57,7 +74,6 @@ impl Backend { /// Run the Backend, starting the PeerPool and Discovery instances. pub async fn run(self: &Arc<Self>) -> Result<()> { - info!("Run the backend {}", self.id); self.peer_pool.start().await?; self.discovery.start().await?; Ok(()) @@ -81,6 +97,11 @@ impl Backend { self.config.clone() } + /// Returns the `KeyPair`. + pub async fn key_pair(&self) -> &KeyPair { + &self.key_pair + } + /// Returns the number of occupied inbound slots. pub fn inbound_slots(&self) -> usize { self.discovery.inbound_slots.load() diff --git a/p2p/src/io_codec.rs b/p2p/src/codec.rs index ea62666..e521824 100644 --- a/p2p/src/io_codec.rs +++ b/p2p/src/codec.rs @@ -3,8 +3,8 @@ use std::time::Duration; use bincode::{Decode, Encode}; use karyons_core::{ - async_utils::timeout, - utils::{decode, encode, encode_into_slice}, + async_util::timeout, + util::{decode, encode, encode_into_slice}, }; use karyons_net::{Connection, NetError}; @@ -17,16 +17,16 @@ use crate::{ pub trait CodecMsg: Decode + Encode + std::fmt::Debug {} impl<T: Encode + Decode + std::fmt::Debug> CodecMsg for T {} -/// I/O codec working with generic network connections. +/// A Codec working with generic network connections. /// /// It is responsible for both decoding data received from the network and /// encoding data before sending it. -pub struct IOCodec { +pub struct Codec { conn: Box<dyn Connection>, } -impl IOCodec { - /// Creates a new IOCodec. +impl Codec { + /// Creates a new Codec. pub fn new(conn: Box<dyn Connection>) -> Self { Self { conn } } @@ -88,18 +88,6 @@ impl IOCodec { .map_err(|_| NetError::Timeout)? } - /// Writes a message of type `T` with the given timeout. - pub async fn write_timeout<T: CodecMsg>( - &self, - command: NetMsgCmd, - msg: &T, - duration: Duration, - ) -> Result<()> { - timeout(duration, self.write(command, msg)) - .await - .map_err(|_| NetError::Timeout)? - } - /// Reads the exact number of bytes required to fill `buf`. async fn read_exact(&self, mut buf: &mut [u8]) -> Result<()> { while !buf.is_empty() { diff --git a/p2p/src/config.rs b/p2p/src/config.rs index ebecbf0..2c5d5ec 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -1,6 +1,6 @@ use karyons_net::{Endpoint, Port}; -use crate::utils::Version; +use crate::Version; /// the Configuration for the P2P network. pub struct Config { @@ -71,6 +71,9 @@ pub struct Config { /// The maximum number of retries for outbound connection establishment /// during the refresh process. pub refresh_connect_retries: usize, + + /// Enables TLS for all connections. + pub enable_tls: bool, } impl Default for Config { @@ -100,6 +103,8 @@ impl Default for Config { refresh_interval: 1800, refresh_response_timeout: 1, refresh_connect_retries: 3, + + enable_tls: false, } } } diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs index 8ec2617..e0a3bbd 100644 --- a/p2p/src/connection.rs +++ b/p2p/src/connection.rs @@ -2,7 +2,7 @@ use std::{collections::VecDeque, fmt, sync::Arc}; use smol::{channel::Sender, lock::Mutex}; -use karyons_core::async_utils::CondVar; +use karyons_core::async_util::CondVar; use karyons_net::Conn; use crate::Result; diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index f41ab57..6fc5734 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -1,21 +1,28 @@ use std::{future::Future, sync::Arc}; -use log::{trace, warn}; +use log::{error, trace, warn}; use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; -use karyons_net::{dial, Conn, Endpoint, NetError}; +use karyons_net::{dial, tls, Conn, Endpoint, NetError}; use crate::{ monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, - Result, + tls_config::tls_client_config, + Error, PeerID, Result, }; +static DNS_NAME: &str = "karyons.org"; + /// Responsible for creating outbound connections with other peers. pub struct Connector { + /// Identity Key pair + key_pair: KeyPair, + /// Managing spawned tasks. task_group: TaskGroup<'static>, @@ -26,6 +33,9 @@ pub struct Connector { /// establishing a connection. max_retries: usize, + /// Enables secure connection. + enable_tls: bool, + /// Responsible for network and system monitoring. monitor: Arc<Monitor>, } @@ -33,16 +43,20 @@ pub struct Connector { impl Connector { /// Creates a new Connector pub fn new( + key_pair: &KeyPair, max_retries: usize, connection_slots: Arc<ConnectionSlots>, + enable_tls: bool, monitor: Arc<Monitor>, ex: GlobalExecutor, ) -> Arc<Self> { Arc::new(Self { + key_pair: key_pair.clone(), + max_retries, task_group: TaskGroup::new(ex), monitor, connection_slots, - max_retries, + enable_tls, }) } @@ -57,20 +71,23 @@ impl Connector { /// `Conn` instance. /// /// This method will block until it finds an available slot. - pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> { + pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> { self.connection_slots.wait_for_slot().await; self.connection_slots.add(); let mut retry = 0; let backoff = Backoff::new(500, 2000); while retry < self.max_retries { - let conn_result = dial(endpoint).await; - - if let Ok(conn) = conn_result { - self.monitor - .notify(&ConnEvent::Connected(endpoint.clone()).into()) - .await; - return Ok(conn); + match self.dial(endpoint, peer_id).await { + Ok(conn) => { + self.monitor + .notify(&ConnEvent::Connected(endpoint.clone()).into()) + .await; + return Ok(conn); + } + Err(err) => { + error!("Failed to establish a connection to {endpoint}: {err}"); + } } self.monitor @@ -96,12 +113,13 @@ impl Connector { pub async fn connect_with_cback<Fut>( self: &Arc<Self>, endpoint: &Endpoint, + peer_id: &Option<PeerID>, callback: impl FnOnce(Conn) -> Fut + Send + 'static, ) -> Result<()> where Fut: Future<Output = Result<()>> + Send + 'static, { - let conn = self.connect(endpoint).await?; + let conn = self.connect(endpoint, peer_id).await?; let selfc = self.clone(); let endpoint = endpoint.clone(); @@ -120,4 +138,14 @@ impl Connector { Ok(()) } + + async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> { + if self.enable_tls { + let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?; + tls::dial(endpoint, tls_config, DNS_NAME).await + } else { + dial(endpoint).await + } + .map_err(Error::KaryonsNet) + } } diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 0138068..60d8635 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -5,13 +5,13 @@ use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; use smol::lock::{Mutex, RwLock}; -use karyons_core::{async_utils::timeout, utils::decode, GlobalExecutor}; +use karyons_core::{async_util::timeout, key_pair::KeyPair, util::decode, GlobalExecutor}; use karyons_net::{Conn, Endpoint}; use crate::{ + codec::Codec, connector::Connector, - io_codec::IOCodec, listener::Listener, message::{ get_msg_payload, FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, @@ -20,7 +20,7 @@ use crate::{ monitor::{ConnEvent, DiscoveryEvent, Monitor}, routing_table::RoutingTable, slots::ConnectionSlots, - utils::version_match, + version::version_match, Config, Error, PeerID, Result, }; @@ -55,6 +55,7 @@ pub struct LookupService { impl LookupService { /// Creates a new lookup service pub fn new( + key_pair: &KeyPair, id: &PeerID, table: Arc<Mutex<RoutingTable>>, config: Arc<Config>, @@ -64,11 +65,19 @@ impl LookupService { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); - let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); + let listener = Listener::new( + key_pair, + inbound_slots.clone(), + config.enable_tls, + monitor.clone(), + ex.clone(), + ); let connector = Connector::new( + key_pair, config.lookup_connect_retries, outbound_slots.clone(), + config.enable_tls, monitor.clone(), ex, ); @@ -116,14 +125,17 @@ impl LookupService { /// randomly generated peer ID. Upon receiving peers from the initial lookup, /// it starts connecting to these received peers and sends them a FindPeer /// message that contains our own peer ID. - pub async fn start_lookup(&self, endpoint: &Endpoint) -> Result<()> { + pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> { trace!("Lookup started {endpoint}"); self.monitor .notify(&DiscoveryEvent::LookupStarted(endpoint.clone()).into()) .await; let mut random_peers = vec![]; - if let Err(err) = self.random_lookup(endpoint, &mut random_peers).await { + if let Err(err) = self + .random_lookup(endpoint, peer_id, &mut random_peers) + .await + { self.monitor .notify(&DiscoveryEvent::LookupFailed(endpoint.clone()).into()) .await; @@ -160,11 +172,14 @@ impl LookupService { async fn random_lookup( &self, endpoint: &Endpoint, + peer_id: Option<PeerID>, random_peers: &mut Vec<PeerMsg>, ) -> Result<()> { for _ in 0..2 { - let peer_id = PeerID::random(); - let peers = self.connect(&peer_id, endpoint.clone()).await?; + let random_peer_id = PeerID::random(); + let peers = self + .connect(endpoint.clone(), peer_id.clone(), &random_peer_id) + .await?; let table = self.table.lock().await; for peer in peers { @@ -187,7 +202,7 @@ impl LookupService { let mut tasks = FuturesUnordered::new(); for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) { let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port); - tasks.push(self.connect(&self.id, endpoint)) + tasks.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id)) } while let Some(result) = tasks.next().await { @@ -200,11 +215,17 @@ impl LookupService { } } - /// Connects to the given endpoint - async fn connect(&self, peer_id: &PeerID, endpoint: Endpoint) -> Result<Vec<PeerMsg>> { - let conn = self.connector.connect(&endpoint).await?; - let io_codec = IOCodec::new(conn); - let result = self.handle_outbound(io_codec, peer_id).await; + /// Connects to the given endpoint and initiates a lookup process for the + /// provided peer ID. + async fn connect( + &self, + endpoint: Endpoint, + peer_id: Option<PeerID>, + target_peer_id: &PeerID, + ) -> Result<Vec<PeerMsg>> { + let conn = self.connector.connect(&endpoint, &peer_id).await?; + let io_codec = Codec::new(conn); + let result = self.handle_outbound(io_codec, target_peer_id).await; self.monitor .notify(&ConnEvent::Disconnected(endpoint).into()) @@ -215,12 +236,16 @@ impl LookupService { } /// Handles outbound connection - async fn handle_outbound(&self, io_codec: IOCodec, peer_id: &PeerID) -> Result<Vec<PeerMsg>> { + async fn handle_outbound( + &self, + io_codec: Codec, + target_peer_id: &PeerID, + ) -> Result<Vec<PeerMsg>> { trace!("Send Ping msg"); self.send_ping_msg(&io_codec).await?; trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, peer_id).await?; + let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?; if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { return Err(Error::Lookup("Received too many peers in PeersMsg")); @@ -260,7 +285,7 @@ impl LookupService { /// Handles inbound connection async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> { - let io_codec = IOCodec::new(conn); + let io_codec = Codec::new(conn); loop { let msg: NetMsg = io_codec.read().await?; trace!("Receive msg {:?}", msg.header.command); @@ -293,7 +318,7 @@ impl LookupService { } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> { trace!("Send Pong msg"); let mut nonce: [u8; 32] = [0; 32]; @@ -319,14 +344,14 @@ impl LookupService { } /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &IOCodec) -> Result<()> { + async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> { trace!("Send Pong msg"); io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; Ok(()) } /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &IOCodec, peer_id: &PeerID) -> Result<PeersMsg> { + async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> { trace!("Send FindPeer msg"); io_codec .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) @@ -342,7 +367,7 @@ impl LookupService { } /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &IOCodec) -> Result<()> { + async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> { trace!("Send Peers msg"); let table = self.table.lock().await; let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); @@ -354,7 +379,7 @@ impl LookupService { } /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &IOCodec, endpoint: Endpoint) -> Result<()> { + async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> { trace!("Send Peer msg"); let peer_msg = PeerMsg { addr: endpoint.addr()?.clone(), @@ -367,7 +392,7 @@ impl LookupService { } /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> { trace!("Send Shutdown msg"); io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; Ok(()) diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 7f55309..2c1bcd8 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -8,7 +8,8 @@ use rand::{rngs::OsRng, seq::SliceRandom}; use smol::lock::Mutex; use karyons_core::{ - async_utils::{Backoff, TaskGroup, TaskResult}, + async_util::{Backoff, TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; @@ -66,6 +67,7 @@ pub struct Discovery { impl Discovery { /// Creates a new Discovery pub fn new( + key_pair: &KeyPair, peer_id: &PeerID, conn_queue: Arc<ConnQueue>, config: Arc<Config>, @@ -81,6 +83,7 @@ impl Discovery { let refresh_service = RefreshService::new(config.clone(), table.clone(), monitor.clone(), ex.clone()); let lookup_service = LookupService::new( + key_pair, peer_id, table.clone(), config.clone(), @@ -89,12 +92,21 @@ impl Discovery { ); let connector = Connector::new( + key_pair, config.max_connect_retries, outbound_slots.clone(), + config.enable_tls, + monitor.clone(), + ex.clone(), + ); + + let listener = Listener::new( + key_pair, + inbound_slots.clone(), + config.enable_tls, monitor.clone(), ex.clone(), ); - let listener = Listener::new(inbound_slots.clone(), monitor.clone(), ex.clone()); Arc::new(Self { refresh_service: Arc::new(refresh_service), @@ -222,7 +234,7 @@ impl Discovery { selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; } Err(Error::PeerAlreadyConnected) => { - // TODO + // TODO: Use the appropriate status. selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; } Err(_) => { @@ -236,10 +248,13 @@ impl Discovery { Ok(()) }; - let res = self.connector.connect_with_cback(endpoint, cback).await; + let result = self + .connector + .connect_with_cback(endpoint, &pid, cback) + .await; if let Some(pid) = &pid { - match res { + match result { Ok(_) => { self.update_entry(pid, CONNECTED_ENTRY).await; } @@ -260,7 +275,8 @@ impl Discovery { match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); - if let Err(err) = self.lookup_service.start_lookup(&endpoint).await { + let peer_id = Some(entry.key.into()); + if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; error!("Failed to do lookup: {endpoint}: {err}"); } @@ -268,7 +284,7 @@ impl Discovery { None => { let peers = &self.config.bootstrap_peers; for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) { - if let Err(err) = self.lookup_service.start_lookup(endpoint).await { + if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await { error!("Failed to do lookup: {endpoint}: {err}"); } } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index d095f19..f797c71 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -10,8 +10,8 @@ use smol::{ }; use karyons_core::{ - async_utils::{timeout, Backoff, TaskGroup, TaskResult}, - utils::{decode, encode}, + async_util::{timeout, Backoff, TaskGroup, TaskResult}, + util::{decode, encode}, GlobalExecutor, }; diff --git a/p2p/src/error.rs b/p2p/src/error.rs index 0c1d50c..6274d4c 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("Unsupported protocol error: {0}")] UnsupportedProtocol(String), + #[error("Try from public key Error: {0}")] + TryFromPublicKey(&'static str), + #[error("Invalid message error: {0}")] InvalidMsg(String), @@ -50,6 +53,21 @@ pub enum Error { #[error("Peer already connected")] PeerAlreadyConnected, + #[error("Yasna Error: {0}")] + Yasna(#[from] yasna::ASN1Error), + + #[error("X509 Parser Error: {0}")] + X509Parser(#[from] x509_parser::error::X509Error), + + #[error("Rcgen Error: {0}")] + Rcgen(#[from] rcgen::RcgenError), + + #[error("Tls Error: {0}")] + Rustls(#[from] async_rustls::rustls::Error), + + #[error("Invalid DNS Name: {0}")] + InvalidDnsNameError(#[from] async_rustls::rustls::client::InvalidDnsNameError), + #[error("Channel Send Error: {0}")] ChannelSend(String), diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index c0a3b5b..6585287 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -7,19 +7,19 @@ //! use easy_parallel::Parallel; //! use smol::{channel as smol_channel, future, Executor}; //! +//! use karyons_core::key_pair::{KeyPair, KeyPairType}; //! use karyons_p2p::{Backend, Config, PeerID}; //! -//! let peer_id = PeerID::random(); +//! let key_pair = KeyPair::generate(&KeyPairType::Ed25519); //! //! // Create the configuration for the backend. //! let mut config = Config::default(); //! -//! //! // Create a new Executor //! let ex = Arc::new(Executor::new()); //! //! // Create a new Backend -//! let backend = Backend::new(peer_id, config, ex.clone()); +//! let backend = Backend::new(&key_pair, config, ex.clone()); //! //! let task = async { //! // Run the backend @@ -36,12 +36,12 @@ //! ``` //! mod backend; +mod codec; mod config; mod connection; mod connector; mod discovery; mod error; -mod io_codec; mod listener; mod message; mod peer; @@ -49,7 +49,8 @@ mod peer_pool; mod protocols; mod routing_table; mod slots; -mod utils; +mod tls_config; +mod version; /// Responsible for network and system monitoring. /// [`Read More`](./monitor/struct.Monitor.html) @@ -62,6 +63,6 @@ pub use backend::{ArcBackend, Backend}; pub use config::Config; pub use error::Error as P2pError; pub use peer::{ArcPeer, PeerID}; -pub use utils::Version; +pub use version::Version; use error::{Error, Result}; diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index f2391f7..58a0931 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -1,28 +1,36 @@ use std::{future::Future, sync::Arc}; -use log::{error, info, trace}; +use log::{debug, error, info}; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, + async_util::{TaskGroup, TaskResult}, + key_pair::KeyPair, GlobalExecutor, }; -use karyons_net::{listen, Conn, Endpoint, Listener as NetListener}; +use karyons_net::{listen, tls, Conn, Endpoint, Listener as NetListener}; use crate::{ monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, - Result, + tls_config::tls_server_config, + Error, Result, }; /// Responsible for creating inbound connections with other peers. pub struct Listener { + /// Identity Key pair + key_pair: KeyPair, + /// Managing spawned tasks. task_group: TaskGroup<'static>, /// Manages available inbound slots. connection_slots: Arc<ConnectionSlots>, + /// Enables secure connection. + enable_tls: bool, + /// Responsible for network and system monitoring. monitor: Arc<Monitor>, } @@ -30,13 +38,17 @@ pub struct Listener { impl Listener { /// Creates a new Listener pub fn new( + key_pair: &KeyPair, connection_slots: Arc<ConnectionSlots>, + enable_tls: bool, monitor: Arc<Monitor>, ex: GlobalExecutor, ) -> Arc<Self> { Arc::new(Self { + key_pair: key_pair.clone(), connection_slots, task_group: TaskGroup::new(ex), + enable_tls, monitor, }) } @@ -55,7 +67,7 @@ impl Listener { where Fut: Future<Output = Result<()>> + Send + 'static, { - let listener = match listen(&endpoint).await { + let listener = match self.listend(&endpoint).await { Ok(listener) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -67,21 +79,17 @@ impl Listener { self.monitor .notify(&ConnEvent::ListenFailed(endpoint).into()) .await; - return Err(err.into()); + return Err(err); } }; let resolved_endpoint = listener.local_endpoint()?; - info!("Start listening on {endpoint}"); + info!("Start listening on {resolved_endpoint}"); let selfc = self.clone(); self.task_group - .spawn(selfc.listen_loop(listener, callback), |res| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Listen loop stopped: {endpoint} {err}"); - } - }); + .spawn(selfc.listen_loop(listener, callback), |_| async {}); Ok(resolved_endpoint) } @@ -94,8 +102,7 @@ impl Listener { self: Arc<Self>, listener: Box<dyn NetListener>, callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, - ) -> Result<()> - where + ) where Fut: Future<Output = Result<()>> + Send + 'static, { loop { @@ -103,27 +110,35 @@ impl Listener { self.connection_slots.wait_for_slot().await; let result = listener.accept().await; - let conn = match result { + let (conn, endpoint) = match result { Ok(c) => { + let endpoint = match c.peer_endpoint() { + Ok(e) => e, + Err(err) => { + self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; + error!("Failed to accept a new connection: {err}"); + continue; + } + }; + self.monitor - .notify(&ConnEvent::Accepted(c.peer_endpoint()?).into()) + .notify(&ConnEvent::Accepted(endpoint.clone()).into()) .await; - c + (c, endpoint) } Err(err) => { error!("Failed to accept a new connection: {err}"); self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; - return Err(err.into()); + continue; } }; self.connection_slots.add(); let selfc = self.clone(); - let endpoint = conn.peer_endpoint()?; let on_disconnect = |res| async move { if let TaskResult::Completed(Err(err)) = res { - trace!("Inbound connection dropped: {err}"); + debug!("Inbound connection dropped: {err}"); } selfc .monitor @@ -136,4 +151,14 @@ impl Listener { self.task_group.spawn(callback(conn), on_disconnect); } } + + async fn listend(&self, endpoint: &Endpoint) -> Result<Box<dyn NetListener>> { + if self.enable_tls { + let tls_config = tls_server_config(&self.key_pair)?; + tls::listen(endpoint, tls_config).await + } else { + listen(endpoint).await + } + .map_err(Error::KaryonsNet) + } } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 3779cc1..6b23322 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -4,7 +4,7 @@ use bincode::{Decode, Encode}; use karyons_net::{Addr, Port}; -use crate::{protocol::ProtocolID, routing_table::Entry, utils::VersionInt, PeerID}; +use crate::{protocol::ProtocolID, routing_table::Entry, version::VersionInt, PeerID}; /// The size of the message header, in bytes. pub const MSG_HEADER_SIZE: usize = 6; diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index fbbf43f..1f74503 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -17,6 +17,7 @@ use karyons_net::Endpoint; /// /// use smol::Executor; /// +/// use karyons_core::key_pair::{KeyPair, KeyPairType}; /// use karyons_p2p::{Config, Backend, PeerID}; /// /// async { @@ -24,7 +25,8 @@ use karyons_net::Endpoint; /// // Create a new Executor /// let ex = Arc::new(Executor::new()); /// -/// let backend = Backend::new(PeerID::random(), Config::default(), ex); +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); +/// let backend = Backend::new(&key_pair, Config::default(), ex); /// /// // Create a new Subscription /// let sub = backend.monitor().await; diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 85cd558..6ed0dd8 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -11,17 +11,17 @@ use smol::{ }; use karyons_core::{ - async_utils::{select, Either, TaskGroup, TaskResult}, + async_util::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, - utils::{decode, encode}, + util::{decode, encode}, GlobalExecutor, }; use karyons_net::Endpoint; use crate::{ + codec::{Codec, CodecMsg}, connection::ConnDirection, - io_codec::{CodecMsg, IOCodec}, message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, peer_pool::{ArcPeerPool, WeakPeerPool}, protocol::{Protocol, ProtocolEvent, ProtocolID}, @@ -37,8 +37,8 @@ pub struct Peer { /// A weak pointer to `PeerPool` peer_pool: WeakPeerPool, - /// Holds the IOCodec for the peer connection - io_codec: IOCodec, + /// Holds the Codec for the peer connection + codec: Codec, /// Remote endpoint for the peer remote_endpoint: Endpoint, @@ -64,7 +64,7 @@ impl Peer { pub fn new( peer_pool: WeakPeerPool, id: &PeerID, - io_codec: IOCodec, + codec: Codec, remote_endpoint: Endpoint, conn_direction: ConnDirection, ex: GlobalExecutor, @@ -72,7 +72,7 @@ impl Peer { Arc::new(Peer { id: id.clone(), peer_pool, - io_codec, + codec, protocol_ids: RwLock::new(Vec::new()), remote_endpoint, conn_direction, @@ -97,7 +97,7 @@ impl Peer { payload: payload.to_vec(), }; - self.io_codec.write(NetMsgCmd::Protocol, &proto_msg).await?; + self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?; Ok(()) } @@ -124,10 +124,7 @@ impl Peer { let _ = self.stop_chan.0.try_send(Ok(())); // No need to handle the error here - let _ = self - .io_codec - .write(NetMsgCmd::Shutdown, &ShutdownMsg(0)) - .await; + let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await; // Force shutting down self.task_group.cancel().await; @@ -174,7 +171,7 @@ impl Peer { /// Start a read loop to handle incoming messages from the peer connection. async fn read_loop(&self) -> Result<()> { loop { - let fut = select(self.stop_chan.1.recv(), self.io_codec.read()).await; + let fut = select(self.stop_chan.1.recv(), self.codec.read()).await; let result = match fut { Either::Left(stop_signal) => { trace!("Peer {} received a stop signal", self.id); diff --git a/p2p/src/peer/peer_id.rs b/p2p/src/peer/peer_id.rs index c8aec7d..903d827 100644 --- a/p2p/src/peer/peer_id.rs +++ b/p2p/src/peer/peer_id.rs @@ -2,6 +2,10 @@ use bincode::{Decode, Encode}; use rand::{rngs::OsRng, RngCore}; use sha2::{Digest, Sha256}; +use karyons_core::key_pair::PublicKey; + +use crate::Error; + /// Represents a unique identifier for a peer. #[derive(Clone, Debug, Eq, PartialEq, Hash, Decode, Encode)] pub struct PeerID(pub [u8; 32]); @@ -39,3 +43,16 @@ impl From<[u8; 32]> for PeerID { PeerID(b) } } + +impl TryFrom<PublicKey> for PeerID { + type Error = Error; + + fn try_from(pk: PublicKey) -> Result<Self, Self::Error> { + let pk: [u8; 32] = pk + .as_bytes() + .try_into() + .map_err(|_| Error::TryFromPublicKey("Failed to convert public key to [u8;32]"))?; + + Ok(PeerID(pk)) + } +} diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index a0079f2..dd7e669 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -11,23 +11,23 @@ use smol::{ }; use karyons_core::{ - async_utils::{TaskGroup, TaskResult}, - utils::decode, + async_util::{TaskGroup, TaskResult}, + util::decode, GlobalExecutor, }; use karyons_net::Conn; use crate::{ + codec::{Codec, CodecMsg}, config::Config, connection::{ConnDirection, ConnQueue}, - io_codec::{CodecMsg, IOCodec}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, monitor::{Monitor, PeerPoolEvent}, peer::{ArcPeer, Peer, PeerID}, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, - utils::{version_match, Version, VersionInt}, + version::{version_match, Version, VersionInt}, Error, Result, }; @@ -155,10 +155,10 @@ impl PeerPool { disconnect_signal: Sender<Result<()>>, ) -> Result<()> { let endpoint = conn.peer_endpoint()?; - let io_codec = IOCodec::new(conn); + let codec = Codec::new(conn); // Do a handshake with the connection before creating a new peer. - let pid = self.do_handshake(&io_codec, conn_direction).await?; + let pid = self.do_handshake(&codec, conn_direction).await?; // TODO: Consider restricting the subnet for inbound connections if self.contains_peer(&pid).await { @@ -169,7 +169,7 @@ impl PeerPool { let peer = Peer::new( Arc::downgrade(self), &pid, - io_codec, + codec, endpoint.clone(), conn_direction.clone(), self.executor.clone(), @@ -235,20 +235,16 @@ impl PeerPool { } /// Initiate a handshake with a connection. - async fn do_handshake( - &self, - io_codec: &IOCodec, - conn_direction: &ConnDirection, - ) -> Result<PeerID> { + async fn do_handshake(&self, codec: &Codec, conn_direction: &ConnDirection) -> Result<PeerID> { match conn_direction { ConnDirection::Inbound => { - let result = self.wait_vermsg(io_codec).await; + let result = self.wait_vermsg(codec).await; match result { Ok(_) => { - self.send_verack(io_codec, true).await?; + self.send_verack(codec, true).await?; } Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => { - self.send_verack(io_codec, false).await?; + self.send_verack(codec, false).await?; } _ => {} } @@ -256,14 +252,14 @@ impl PeerPool { } ConnDirection::Outbound => { - self.send_vermsg(io_codec).await?; - self.wait_verack(io_codec).await + self.send_vermsg(codec).await?; + self.wait_verack(codec).await } } } /// Send a Version message - async fn send_vermsg(&self, io_codec: &IOCodec) -> Result<()> { + async fn send_vermsg(&self, codec: &Codec) -> Result<()> { let pids = self.protocol_versions.read().await; let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect(); drop(pids); @@ -275,16 +271,16 @@ impl PeerPool { }; trace!("Send VerMsg"); - io_codec.write(NetMsgCmd::Version, &vermsg).await?; + codec.write(NetMsgCmd::Version, &vermsg).await?; Ok(()) } /// Wait for a Version message /// /// Returns the peer's ID upon successfully receiving the Version message. - async fn wait_vermsg(&self, io_codec: &IOCodec) -> Result<PeerID> { + async fn wait_vermsg(&self, codec: &Codec) -> Result<PeerID> { let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; + let msg: NetMsg = codec.read_timeout(timeout).await?; let payload = get_msg_payload!(Version, msg); let (vermsg, _) = decode::<VerMsg>(&payload)?; @@ -300,23 +296,23 @@ impl PeerPool { } /// Send a Verack message - async fn send_verack(&self, io_codec: &IOCodec, ack: bool) -> Result<()> { + async fn send_verack(&self, codec: &Codec, ack: bool) -> Result<()> { let verack = VerAckMsg { peer_id: self.id.clone(), ack, }; trace!("Send VerAckMsg {:?}", verack); - io_codec.write(NetMsgCmd::Verack, &verack).await?; + codec.write(NetMsgCmd::Verack, &verack).await?; Ok(()) } /// Wait for a Verack message /// /// Returns the peer's ID upon successfully receiving the Verack message. - async fn wait_verack(&self, io_codec: &IOCodec) -> Result<PeerID> { + async fn wait_verack(&self, codec: &Codec) -> Result<PeerID> { let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = io_codec.read_timeout(timeout).await?; + let msg: NetMsg = codec.read_timeout(timeout).await?; let payload = get_msg_payload!(Verack, msg); let (verack, _) = decode::<VerAckMsg>(&payload)?; diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 770b695..8ddc685 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use karyons_core::{event::EventValue, Executor}; -use crate::{peer::ArcPeer, utils::Version, Result}; +use crate::{peer::ArcPeer, version::Version, Result}; pub type ArcProtocol = Arc<dyn Protocol>; @@ -37,6 +37,7 @@ impl EventValue for ProtocolEvent { /// use async_trait::async_trait; /// use smol::Executor; /// +/// use karyons_core::key_pair::{KeyPair, KeyPairType}; /// use karyons_p2p::{ /// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent}, /// Backend, PeerID, Config, Version, P2pError, ArcPeer}; @@ -84,14 +85,14 @@ impl EventValue for ProtocolEvent { /// } /// /// async { -/// let peer_id = PeerID::random(); +/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); /// let config = Config::default(); /// /// // Create a new Executor /// let ex = Arc::new(Executor::new()); /// /// // Create a new Backend -/// let backend = Backend::new(peer_id, config, ex); +/// let backend = Backend::new(&key_pair, config, ex); /// /// // Attach the NewProtocol /// let c = move |peer| NewProtocol::new(peer); diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index dc1b9a1..0a5488d 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -12,9 +12,9 @@ use smol::{ }; use karyons_core::{ - async_utils::{select, timeout, Either, TaskGroup, TaskResult}, + async_util::{select, timeout, Either, TaskGroup, TaskResult}, event::EventListener, - utils::decode, + util::decode, Executor, }; @@ -23,7 +23,7 @@ use karyons_net::NetError; use crate::{ peer::ArcPeer, protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID}, - utils::Version, + version::Version, Result, }; diff --git a/p2p/src/routing_table/entry.rs b/p2p/src/routing_table/entry.rs index b3f219f..c5fa65d 100644 --- a/p2p/src/routing_table/entry.rs +++ b/p2p/src/routing_table/entry.rs @@ -20,7 +20,7 @@ pub struct Entry { impl PartialEq for Entry { fn eq(&self, other: &Self) -> bool { - // XXX this should also compare both addresses (the self.addr == other.addr) + // TODO: this should also compare both addresses (the self.addr == other.addr) self.key == other.key } } diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index 5277c0a..cfc3128 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -1,5 +1,8 @@ +use std::net::IpAddr; + mod bucket; mod entry; + pub use bucket::{ Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY, UNSTABLE_ENTRY, @@ -8,7 +11,7 @@ pub use entry::{xor_distance, Entry, Key}; use rand::{rngs::OsRng, seq::SliceRandom}; -use crate::utils::subnet_match; +use karyons_net::Addr; use bucket::BUCKET_SIZE; use entry::KEY_SIZE; @@ -262,6 +265,20 @@ impl RoutingTable { } } +/// Check if two addresses belong to the same subnet. +pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { + match (addr, other_addr) { + (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => { + // TODO: Consider moving this to a different place + if other_ip.is_loopback() && ip.is_loopback() { + return false; + } + ip.octets()[0..3] == other_ip.octets()[0..3] + } + _ => false, + } +} + #[cfg(test)] mod tests { use super::bucket::ALL_ENTRY; diff --git a/p2p/src/slots.rs b/p2p/src/slots.rs index 99f0a78..d3a1d0a 100644 --- a/p2p/src/slots.rs +++ b/p2p/src/slots.rs @@ -1,6 +1,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; -use karyons_core::async_utils::CondWait; +use karyons_core::async_util::CondWait; /// Manages available inbound and outbound slots. pub struct ConnectionSlots { diff --git a/p2p/src/tls_config.rs b/p2p/src/tls_config.rs new file mode 100644 index 0000000..f3b231a --- /dev/null +++ b/p2p/src/tls_config.rs @@ -0,0 +1,214 @@ +use std::sync::Arc; + +use async_rustls::rustls::{ + self, cipher_suite::TLS13_CHACHA20_POLY1305_SHA256, client::ServerCertVerifier, + server::ClientCertVerifier, Certificate, CertificateError, Error::InvalidCertificate, + PrivateKey, SupportedCipherSuite, SupportedKxGroup, SupportedProtocolVersion, +}; +use log::error; +use x509_parser::{certificate::X509Certificate, parse_x509_certificate}; + +use karyons_core::key_pair::{KeyPair, KeyPairType, PublicKey}; + +use crate::{PeerID, Result}; + +// NOTE: This code needs a comprehensive audit. + +static PROTOCOL_VERSIONS: &[&SupportedProtocolVersion] = &[&rustls::version::TLS13]; +static CIPHER_SUITES: &[SupportedCipherSuite] = &[TLS13_CHACHA20_POLY1305_SHA256]; +static KX_GROUPS: &[&SupportedKxGroup] = &[&rustls::kx_group::X25519]; + +const BAD_SIGNATURE_ERR: rustls::Error = InvalidCertificate(CertificateError::BadSignature); +const BAD_ENCODING_ERR: rustls::Error = InvalidCertificate(CertificateError::BadEncoding); + +/// Returns a TLS client configuration. +pub fn tls_client_config( + key_pair: &KeyPair, + peer_id: Option<PeerID>, +) -> Result<rustls::ClientConfig> { + let (cert, private_key) = generate_cert(key_pair)?; + let server_verifier = SrvrCertVerifier { peer_id }; + let client_config = rustls::ClientConfig::builder() + .with_cipher_suites(CIPHER_SUITES) + .with_kx_groups(KX_GROUPS) + .with_protocol_versions(PROTOCOL_VERSIONS)? + .with_custom_certificate_verifier(Arc::new(server_verifier)) + .with_client_auth_cert(vec![cert], private_key)?; + + Ok(client_config) +} + +/// Returns a TLS server configuration. +pub fn tls_server_config(key_pair: &KeyPair) -> Result<rustls::ServerConfig> { + let (cert, private_key) = generate_cert(key_pair)?; + let client_verifier = CliCertVerifier {}; + let server_config = rustls::ServerConfig::builder() + .with_cipher_suites(CIPHER_SUITES) + .with_kx_groups(KX_GROUPS) + .with_protocol_versions(PROTOCOL_VERSIONS)? + .with_client_cert_verifier(Arc::new(client_verifier)) + .with_single_cert(vec![cert], private_key)?; + + Ok(server_config) +} + +/// Generates a certificate and returns both the certificate and the private key. +fn generate_cert(key_pair: &KeyPair) -> Result<(Certificate, PrivateKey)> { + let cert_key_pair = rcgen::KeyPair::generate(&rcgen::PKCS_ED25519)?; + let private_key = rustls::PrivateKey(cert_key_pair.serialize_der()); + + // Add a custom extension to the certificate: + // - Sign the certificate's public key with the provided key pair's public key + // - Append both the signature and the key pair's public key to the extension + let signature = key_pair.sign(&cert_key_pair.public_key_der()); + let ext_content = yasna::encode_der(&(key_pair.public().as_bytes().to_vec(), signature)); + // XXX: Not sure about the oid number ??? + let mut ext = rcgen::CustomExtension::from_oid_content(&[0, 0, 0, 0], ext_content); + ext.set_criticality(true); + + let mut params = rcgen::CertificateParams::new(vec![]); + params.alg = &rcgen::PKCS_ED25519; + params.key_pair = Some(cert_key_pair); + params.custom_extensions.push(ext); + + let cert = rustls::Certificate(rcgen::Certificate::from_params(params)?.serialize_der()?); + Ok((cert, private_key)) +} + +/// Verifies the given certification. +fn verify_cert(end_entity: &Certificate) -> std::result::Result<PeerID, rustls::Error> { + // Parse the certificate. + let cert = parse_cert(end_entity)?; + + match cert.extensions().first() { + Some(ext) => { + // Extract the peer id (public key) and the signature from the extension. + let (public_key, signature): (Vec<u8>, Vec<u8>) = + yasna::decode_der(ext.value).map_err(|_| BAD_ENCODING_ERR)?; + + // Use the peer id (public key) to verify the extracted signature. + let public_key = PublicKey::from_bytes(&KeyPairType::Ed25519, &public_key) + .map_err(|_| BAD_ENCODING_ERR)?; + public_key + .verify(cert.public_key().raw, &signature) + .map_err(|_| BAD_SIGNATURE_ERR)?; + + // Verify the certificate signature. + verify_cert_signature( + &cert, + cert.tbs_certificate.as_ref(), + cert.signature_value.as_ref(), + )?; + + PeerID::try_from(public_key).map_err(|_| BAD_ENCODING_ERR) + } + None => Err(BAD_ENCODING_ERR), + } +} + +/// Parses the given x509 certificate. +fn parse_cert(end_entity: &Certificate) -> std::result::Result<X509Certificate, rustls::Error> { + let (_, cert) = parse_x509_certificate(end_entity.as_ref()).map_err(|_| BAD_ENCODING_ERR)?; + + if !cert.validity().is_valid() { + return Err(InvalidCertificate(CertificateError::NotValidYet)); + } + + Ok(cert) +} + +/// Verifies the signature of the given certificate. +fn verify_cert_signature( + cert: &X509Certificate, + message: &[u8], + signature: &[u8], +) -> std::result::Result<(), rustls::Error> { + let public_key = PublicKey::from_bytes( + &KeyPairType::Ed25519, + cert.tbs_certificate.subject_pki.subject_public_key.as_ref(), + ) + .map_err(|_| BAD_ENCODING_ERR)?; + + public_key + .verify(message, signature) + .map_err(|_| BAD_SIGNATURE_ERR) +} + +struct SrvrCertVerifier { + peer_id: Option<PeerID>, +} + +impl ServerCertVerifier for SrvrCertVerifier { + fn verify_server_cert( + &self, + end_entity: &Certificate, + _intermediates: &[Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator<Item = &[u8]>, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> std::result::Result<rustls::client::ServerCertVerified, rustls::Error> { + let peer_id = match verify_cert(end_entity) { + Ok(pid) => pid, + Err(err) => { + error!("Failed to verify cert: {err}"); + return Err(err); + } + }; + + // Verify that the peer id in the certificate's extension matches the + // one the client intends to connect to. + // Both should be equal for establishing a fully secure connection. + if let Some(pid) = &self.peer_id { + if pid != &peer_id { + return Err(InvalidCertificate( + CertificateError::ApplicationVerificationFailure, + )); + } + } + + Ok(rustls::client::ServerCertVerified::assertion()) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &Certificate, + dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result<rustls::client::HandshakeSignatureValid, rustls::Error> { + let cert = parse_cert(cert)?; + verify_cert_signature(&cert, message, dss.signature())?; + Ok(rustls::client::HandshakeSignatureValid::assertion()) + } +} + +struct CliCertVerifier {} +impl ClientCertVerifier for CliCertVerifier { + fn verify_client_cert( + &self, + end_entity: &Certificate, + _intermediates: &[Certificate], + _now: std::time::SystemTime, + ) -> std::result::Result<rustls::server::ClientCertVerified, rustls::Error> { + if let Err(err) = verify_cert(end_entity) { + error!("Failed to verify cert: {err}"); + return Err(err); + }; + Ok(rustls::server::ClientCertVerified::assertion()) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &Certificate, + dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result<rustls::client::HandshakeSignatureValid, rustls::Error> { + let cert = parse_cert(cert)?; + verify_cert_signature(&cert, message, dss.signature())?; + Ok(rustls::client::HandshakeSignatureValid::assertion()) + } + + fn client_auth_root_subjects(&self) -> &[rustls::DistinguishedName] { + &[] + } +} diff --git a/p2p/src/utils/mod.rs b/p2p/src/utils/mod.rs deleted file mode 100644 index e8ff9d0..0000000 --- a/p2p/src/utils/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod version; - -pub use version::{version_match, Version, VersionInt}; - -use std::net::IpAddr; - -use karyons_net::Addr; - -/// Check if two addresses belong to the same subnet. -pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { - match (addr, other_addr) { - (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => { - // XXX Consider moving this to a different location - if other_ip.is_loopback() && ip.is_loopback() { - return false; - } - ip.octets()[0..3] == other_ip.octets()[0..3] - } - _ => false, - } -} diff --git a/p2p/src/utils/version.rs b/p2p/src/version.rs index a101b28..a101b28 100644 --- a/p2p/src/utils/version.rs +++ b/p2p/src/version.rs |