aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock12
-rw-r--r--Cargo.toml56
-rw-r--r--core/Cargo.toml31
-rw-r--r--core/src/async_util/condvar.rs5
-rw-r--r--core/src/async_util/task_group.rs2
-rw-r--r--core/src/lib.rs1
-rw-r--r--jsonrpc/Cargo.toml41
-rw-r--r--jsonrpc/impl/Cargo.toml8
-rw-r--r--jsonrpc/impl/src/lib.rs12
-rw-r--r--jsonrpc/src/server/pubsub_service.rs2
-rw-r--r--jsonrpc/src/server/service.rs2
-rw-r--r--net/Cargo.toml48
-rw-r--r--net/async_rustls/Cargo.toml5
-rw-r--r--net/src/endpoint.rs148
-rw-r--r--p2p/Cargo.toml54
-rw-r--r--p2p/src/connection.rs3
-rw-r--r--p2p/src/connector.rs8
-rw-r--r--p2p/src/discovery/lookup.rs29
-rw-r--r--p2p/src/discovery/mod.rs1
-rw-r--r--p2p/src/error.rs3
-rw-r--r--p2p/src/listener.rs8
21 files changed, 284 insertions, 195 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 639e8ce..83dfa16 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -344,9 +344,9 @@ dependencies = [
[[package]]
name = "async-tungstenite"
-version = "0.26.2"
+version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb786dab48e539c5f17b23bac20d812ac027c01732ed7c7b58850c69a684e46c"
+checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b"
dependencies = [
"async-std",
"futures-io",
@@ -1871,9 +1871,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
-version = "1.7.0"
+version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
+checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55"
[[package]]
name = "rustls-webpki"
@@ -2209,9 +2209,9 @@ checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
[[package]]
name = "tungstenite"
-version = "0.23.0"
+version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8"
+checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes",
diff --git a/Cargo.toml b/Cargo.toml
index b74b876..1cbc80e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,7 +2,7 @@
resolver = "2"
# Please ensure that each crate comes before any other crate that depends on it
-members = ["core", "net", "p2p", "jsonrpc"]
+members = ["core", "net", "net/async_rustls", "jsonrpc", "jsonrpc/impl", "p2p"]
[workspace.package]
description = "A library for building p2p, decentralized, and collaborative software"
@@ -14,7 +14,53 @@ license = "GPL-3.0"
authors = ["hozan23 <hozan23@karyontech.net>"]
[workspace.dependencies]
-karyon_core = { path = "core", default-features = false }
-karyon_net = { path = "net", default-features = false }
-karyon_jsonrpc = { path = "jsonrpc", default-features = false }
-karyon_p2p = { path = "p2p", default-features = false }
+karyon_core = { path = "core", version = "0.1.6", default-features = false }
+
+karyon_net = { path = "net", version = "0.1.6", default-features = false }
+karyon_async_rustls = { path = "net/async_rustls", version = "0.1.6", default-features = false }
+
+karyon_jsonrpc = { path = "jsonrpc", version = "0.1.6", default-features = false }
+karyon_jsonrpc_macro = { path = "jsonrpc/impl", version = "0.1.6", default-features = false }
+
+karyon_p2p = { path = "p2p", version = "0.1.6", default-features = false }
+
+log = "0.4"
+thiserror = "1.0"
+chrono = "0.4"
+rand = "0.8"
+url = "2.5"
+parking_lot = "0.12"
+once_cell = "1.19"
+semver = "1.0"
+sha2 = "0.10"
+
+# async
+async-channel = "2.3"
+async-trait = "0.1"
+pin-project-lite = "0.2"
+async-process = "2.2"
+smol = "2.0"
+tokio = "1.38"
+futures-util = { version = "0.3", default-features = false }
+
+# encode
+bincode = "2.0.0-rc.3"
+serde = "1.0"
+serde_json = "1.0"
+base64 = "0.22"
+
+# macros
+proc-macro2 = "1.0"
+quote = "1.0"
+syn = "2.0"
+
+# websocket
+async-tungstenite = { version = "0.28", default-features = false }
+
+# tls
+rustls-pki-types = "1.9"
+futures-rustls = "0.26"
+tokio-rustls = "0.26"
+rcgen = "0.13"
+yasna = "0.5"
+x509-parser = "0.16"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 895ddf6..a43537b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -15,26 +15,23 @@ tokio = ["dep:tokio"]
smol = ["dep:smol", "async-process"]
[dependencies]
-log = "0.4.21"
-thiserror = "1.0.61"
-chrono = "0.4.38"
-rand = "0.8.5"
+log = { workspace = true }
+thiserror = { workspace = true }
+chrono = { workspace = true }
+rand = { workspace = true }
+parking_lot = { workspace = true }
dirs = "5.0.1"
-parking_lot = "0.12.3"
-once_cell = "1.19.0"
+once_cell = { workspace = true }
-# async
-async-channel = "2.3.1"
-pin-project-lite = "0.2.14"
-async-process = { version = "2.2.3", optional = true }
-smol = { version = "2.0.0", optional = true }
-tokio = { version = "1.38.0", features = ["full"], optional = true }
-futures-util = { version = "0.3.5", features = [
- "alloc",
+async-channel = { workspace = true }
+pin-project-lite = { workspace = true }
+async-process = { workspace = true, optional = true }
+smol = { workspace = true, optional = true }
+tokio = { workspace = true, features = ["full"], optional = true }
+futures-util = { workspace = true, features = [
+ "alloc",
], default-features = false }
-# encode
-bincode = "2.0.0-rc.3"
+bincode = { workspace = true }
-# crypto
ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true }
diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs
index e425eda..44705c6 100644
--- a/core/src/async_util/condvar.rs
+++ b/core/src/async_util/condvar.rs
@@ -58,7 +58,6 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16};
/// };
///
/// ```
-
pub struct CondVar {
inner: Mutex<Wakers>,
}
@@ -116,7 +115,7 @@ impl<'a, T> CondVarAwait<'a, T> {
}
}
-impl<'a, T> Future for CondVarAwait<'a, T> {
+impl<T> Future for CondVarAwait<'_, T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -155,7 +154,7 @@ impl<'a, T> Future for CondVarAwait<'a, T> {
}
}
-impl<'a, T> Drop for CondVarAwait<'a, T> {
+impl<T> Drop for CondVarAwait<'_, T> {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut inner = self.condvar.inner.lock();
diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs
index c55b9a1..39bbf5c 100644
--- a/core/src/async_util/task_group.rs
+++ b/core/src/async_util/task_group.rs
@@ -126,7 +126,7 @@ pub struct TaskHandler {
cancel_flag: Arc<CondWait>,
}
-impl<'a> TaskHandler {
+impl TaskHandler {
/// Creates a new task handler
fn new<T, Fut, CallbackF, CallbackFut>(
ex: Executor,
diff --git a/core/src/lib.rs b/core/src/lib.rs
index a7192d9..fedb226 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -23,7 +23,6 @@ pub mod pubsub;
pub mod async_runtime;
#[cfg(feature = "crypto")]
-
/// Collects common cryptographic tools
pub mod crypto;
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml
index 2260b7e..bdbcb74 100644
--- a/jsonrpc/Cargo.toml
+++ b/jsonrpc/Cargo.toml
@@ -17,39 +17,38 @@ tls = ["tcp", "karyon_net/tls"]
ws = ["tcp", "karyon_net/ws", "async-tungstenite"]
unix = ["karyon_net/unix"]
smol = [
- "karyon_core/smol",
- "karyon_net/smol",
- "karyon_jsonrpc_macro/smol",
- "async-tungstenite?/async-std-runtime",
+ "karyon_core/smol",
+ "karyon_net/smol",
+ "karyon_jsonrpc_macro/smol",
+ "async-tungstenite?/async-std-runtime",
]
tokio = [
- "karyon_core/tokio",
- "karyon_net/tokio",
- "karyon_jsonrpc_macro/tokio",
- "async-tungstenite?/tokio-runtime",
+ "karyon_core/tokio",
+ "karyon_net/tokio",
+ "karyon_jsonrpc_macro/tokio",
+ "async-tungstenite?/tokio-runtime",
]
[dependencies]
-karyon_core = { version = "0.1.6", path = "../core", default-features = false }
-karyon_net = { version = "0.1.6", path = "../net", default-features = false }
+karyon_core = { workspace = true }
+karyon_net = { workspace = true }
+karyon_jsonrpc_macro = { workspace = true }
-karyon_jsonrpc_macro = { version = "0.1.6", path = "impl", default-features = false }
-
-log = "0.4.21"
-rand = "0.8.5"
-thiserror = "1.0.61"
+log = { workspace = true }
+rand = { workspace = true }
+thiserror = { workspace = true }
# encode/decode
-serde = { version = "1.0.203", features = ["derive"] }
-serde_json = "1.0.117"
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
# async
-async-trait = "0.1.80"
-async-channel = "2.3.1"
+async-trait = { workspace = true }
+async-channel = { workspace = true }
# websocket
-async-tungstenite = { version = "0.26.2", default-features = false, optional = true }
+async-tungstenite = { workspace = true, optional = true }
[dev-dependencies]
+smol = { workspace = true }
env_logger = "0.11.3"
-smol = "2.0.0"
diff --git a/jsonrpc/impl/Cargo.toml b/jsonrpc/impl/Cargo.toml
index cf001e1..0bf7890 100644
--- a/jsonrpc/impl/Cargo.toml
+++ b/jsonrpc/impl/Cargo.toml
@@ -19,8 +19,8 @@ smol = []
tokio = []
[dependencies]
-proc-macro2 = "1.0"
-quote = "1.0"
-syn = { version = "2.0", features = ["full"] }
+proc-macro2 = { workspace = true }
+quote = { workspace = true }
+syn = { workspace = true, features = ["full"] }
-serde_json = "1.0.117"
+serde_json = { workspace = true }
diff --git a/jsonrpc/impl/src/lib.rs b/jsonrpc/impl/src/lib.rs
index 8814e61..d7ea466 100644
--- a/jsonrpc/impl/src/lib.rs
+++ b/jsonrpc/impl/src/lib.rs
@@ -54,9 +54,9 @@ pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
let item: TokenStream2 = item.into();
quote! {
impl karyon_jsonrpc::RPCService for #self_ty {
- fn get_method<'a>(
- &'a self,
- name: &'a str
+ fn get_method(
+ &self,
+ name: &str
) -> Option<karyon_jsonrpc::RPCMethod> {
match name {
#(#impl_methods)*
@@ -115,9 +115,9 @@ pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
let item: TokenStream2 = item.into();
quote! {
impl karyon_jsonrpc::PubSubRPCService for #self_ty {
- fn get_pubsub_method<'a>(
- &'a self,
- name: &'a str
+ fn get_pubsub_method(
+ &self,
+ name: &str
) -> Option<karyon_jsonrpc::PubSubRPCMethod> {
match name {
#(#impl_methods)*
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index 909b0b0..0775880 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -12,6 +12,6 @@ type PubSubRPCMethodOutput<'a> =
/// Defines the interface for an RPC service.
pub trait PubSubRPCService: Sync + Send {
- fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option<PubSubRPCMethod>;
+ fn get_pubsub_method(&self, name: &str) -> Option<PubSubRPCMethod>;
fn name(&self) -> String;
}
diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs
index 787da86..5195d2c 100644
--- a/jsonrpc/src/server/service.rs
+++ b/jsonrpc/src/server/service.rs
@@ -9,6 +9,6 @@ type RPCMethodOutput<'a> =
/// Defines the interface for an RPC service.
pub trait RPCService: Sync + Send {
- fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>;
+ fn get_method(&self, name: &str) -> Option<RPCMethod>;
fn name(&self) -> String;
}
diff --git a/net/Cargo.toml b/net/Cargo.toml
index 01c7d3e..1a1a34d 100644
--- a/net/Cargo.toml
+++ b/net/Cargo.toml
@@ -20,44 +20,40 @@ ws = ["tcp", "async-tungstenite"]
udp = []
unix = ["stream"]
smol = [
- "karyon_core/smol",
- "async-tungstenite?/async-std-runtime",
- "karyon_async_rustls?/smol",
+ "karyon_core/smol",
+ "async-tungstenite?/async-std-runtime",
+ "karyon_async_rustls?/smol",
]
tokio = [
- "karyon_core/tokio",
- "async-tungstenite?/tokio-runtime",
- "karyon_async_rustls?/tokio",
- "dep:tokio",
+ "karyon_core/tokio",
+ "async-tungstenite?/tokio-runtime",
+ "karyon_async_rustls?/tokio",
+ "dep:tokio",
]
serde = ["dep:serde"]
[dependencies]
-karyon_core = { version = "0.1.6", path = "../core", default-features = false }
-karyon_async_rustls = { version = "0.1.6", path = "./async_rustls", default-features = false, optional = true }
+karyon_core = { workspace = true }
+karyon_async_rustls = { workspace = true, optional = true }
-log = "0.4.21"
-thiserror = "1.0.61"
-url = "2.5.2"
-
-# encode/decode
-serde = { version = "1.0.203", features = ["derive"], optional = true }
-bincode = { version = "2.0.0-rc.3", features = ["derive"] }
+log = { workspace = true }
+thiserror = { workspace = true }
+url = { workspace = true }
+serde = { workspace = true, features = ["derive"], optional = true }
+bincode = { workspace = true, features = ["derive"] }
# async
-async-trait = "0.1.80"
-async-channel = "2.3.1"
-futures-util = { version = "0.3.30", default-features = false, features = [
- "sink",
-], optional = true }
-pin-project-lite = { version = "0.2.14", optional = true }
-tokio = { version = "1.38.0", features = ["io-util"], optional = true }
+async-trait = { workspace = true }
+async-channel = { workspace = true }
+futures-util = { workspace = true, features = ["sink"], optional = true }
+pin-project-lite = { workspace = true, optional = true }
+tokio = { workspace = true, features = ["io-util"], optional = true }
# websocket
-async-tungstenite = { version = "0.26.2", default-features = false, optional = true }
+async-tungstenite = { workspace = true, optional = true }
# tls
-rustls-pki-types = { version = "1.7.0", optional = true }
+rustls-pki-types = { workspace = true, optional = true }
[dev-dependencies]
-smol = "2.0.0"
+smol = { workspace = true }
diff --git a/net/async_rustls/Cargo.toml b/net/async_rustls/Cargo.toml
index 66ac2cc..871f02e 100644
--- a/net/async_rustls/Cargo.toml
+++ b/net/async_rustls/Cargo.toml
@@ -14,6 +14,5 @@ smol = ["futures-rustls"]
tokio = ["tokio-rustls"]
[dependencies]
-futures-rustls = { version = "0.26.0", optional = true }
-tokio-rustls = { version = "0.26.0", optional = true }
-
+futures-rustls = { workspace = true, optional = true }
+tokio-rustls = { workspace = true, optional = true }
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs
index c3626ec..dff703d 100644
--- a/net/src/endpoint.rs
+++ b/net/src/endpoint.rs
@@ -45,6 +45,98 @@ pub enum Endpoint {
Unix(PathBuf),
}
+impl Endpoint {
+ /// Creates a new TCP endpoint from a `SocketAddr`.
+ pub fn new_tcp_addr(addr: SocketAddr) -> Endpoint {
+ Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new UDP endpoint from a `SocketAddr`.
+ pub fn new_udp_addr(addr: SocketAddr) -> Endpoint {
+ Endpoint::Udp(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new TLS endpoint from a `SocketAddr`.
+ pub fn new_tls_addr(addr: SocketAddr) -> Endpoint {
+ Endpoint::Tls(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new WS endpoint from a `SocketAddr`.
+ pub fn new_ws_addr(addr: SocketAddr) -> Endpoint {
+ Endpoint::Ws(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new WSS endpoint from a `SocketAddr`.
+ pub fn new_wss_addr(addr: SocketAddr) -> Endpoint {
+ Endpoint::Wss(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new Unix endpoint from a `UnixSocketAddr`.
+ pub fn new_unix_addr(addr: &std::path::Path) -> Endpoint {
+ Endpoint::Unix(addr.to_path_buf())
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Tcp`.
+ pub fn is_tcp(&self) -> bool {
+ matches!(self, Endpoint::Tcp(..))
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Tls`.
+ pub fn is_tls(&self) -> bool {
+ matches!(self, Endpoint::Tls(..))
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Ws`.
+ pub fn is_ws(&self) -> bool {
+ matches!(self, Endpoint::Ws(..))
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Wss`.
+ pub fn is_wss(&self) -> bool {
+ matches!(self, Endpoint::Wss(..))
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Udp`.
+ pub fn is_udp(&self) -> bool {
+ matches!(self, Endpoint::Udp(..))
+ }
+
+ #[inline]
+ /// Checks if the `Endpoint` is of type `Unix`.
+ pub fn is_unix(&self) -> bool {
+ matches!(self, Endpoint::Unix(..))
+ }
+
+ /// Returns the `Port` of the endpoint.
+ pub fn port(&self) -> Result<&Port> {
+ match self {
+ Endpoint::Tcp(_, port)
+ | Endpoint::Udp(_, port)
+ | Endpoint::Tls(_, port)
+ | Endpoint::Ws(_, port)
+ | Endpoint::Wss(_, port) => Ok(port),
+ _ => Err(Error::TryFromEndpoint),
+ }
+ }
+
+ /// Returns the `Addr` of the endpoint.
+ pub fn addr(&self) -> Result<&Addr> {
+ match self {
+ Endpoint::Tcp(addr, _)
+ | Endpoint::Udp(addr, _)
+ | Endpoint::Tls(addr, _)
+ | Endpoint::Ws(addr, _)
+ | Endpoint::Wss(addr, _) => Ok(addr),
+ _ => Err(Error::TryFromEndpoint),
+ }
+ }
+}
+
impl std::fmt::Display for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
@@ -152,62 +244,6 @@ impl FromStr for Endpoint {
}
}
-impl Endpoint {
- /// Creates a new TCP endpoint from a `SocketAddr`.
- pub fn new_tcp_addr(addr: SocketAddr) -> Endpoint {
- Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port())
- }
-
- /// Creates a new UDP endpoint from a `SocketAddr`.
- pub fn new_udp_addr(addr: SocketAddr) -> Endpoint {
- Endpoint::Udp(Addr::Ip(addr.ip()), addr.port())
- }
-
- /// Creates a new TLS endpoint from a `SocketAddr`.
- pub fn new_tls_addr(addr: SocketAddr) -> Endpoint {
- Endpoint::Tls(Addr::Ip(addr.ip()), addr.port())
- }
-
- /// Creates a new WS endpoint from a `SocketAddr`.
- pub fn new_ws_addr(addr: SocketAddr) -> Endpoint {
- Endpoint::Ws(Addr::Ip(addr.ip()), addr.port())
- }
-
- /// Creates a new WSS endpoint from a `SocketAddr`.
- pub fn new_wss_addr(addr: SocketAddr) -> Endpoint {
- Endpoint::Wss(Addr::Ip(addr.ip()), addr.port())
- }
-
- /// Creates a new Unix endpoint from a `UnixSocketAddr`.
- pub fn new_unix_addr(addr: &std::path::Path) -> Endpoint {
- Endpoint::Unix(addr.to_path_buf())
- }
-
- /// Returns the `Port` of the endpoint.
- pub fn port(&self) -> Result<&Port> {
- match self {
- Endpoint::Tcp(_, port)
- | Endpoint::Udp(_, port)
- | Endpoint::Tls(_, port)
- | Endpoint::Ws(_, port)
- | Endpoint::Wss(_, port) => Ok(port),
- _ => Err(Error::TryFromEndpoint),
- }
- }
-
- /// Returns the `Addr` of the endpoint.
- pub fn addr(&self) -> Result<&Addr> {
- match self {
- Endpoint::Tcp(addr, _)
- | Endpoint::Udp(addr, _)
- | Endpoint::Tls(addr, _)
- | Endpoint::Ws(addr, _)
- | Endpoint::Wss(addr, _) => Ok(addr),
- _ => Err(Error::TryFromEndpoint),
- }
- }
-}
-
/// Addr defines a type for an address, either IP or domain.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index 9c303e9..15f8cac 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -18,50 +18,40 @@ tokio = ["karyon_core/tokio", "karyon_net/tokio", "tokio-rustls"]
serde = ["dep:serde", "karyon_net/serde"]
[dependencies]
-karyon_core = { workspace = true, features = [
- "crypto",
-], default-features = false }
-karyon_net = { workspace = true, default-features = false, features = [
- "tcp",
- "tls",
- "udp",
-] }
+karyon_core = { workspace = true, features = ["crypto"] }
+karyon_net = { workspace = true, features = ["tcp", "tls", "udp"] }
-log = "0.4.21"
-chrono = "0.4.38"
-rand = "0.8.5"
-thiserror = "1.0.61"
-semver = "1.0.23"
-sha2 = "0.10.8"
-parking_lot = "0.12.3"
+log = { workspace = true }
+chrono = { workspace = true }
+rand = { workspace = true }
+thiserror = { workspace = true }
+semver = { workspace = true }
+sha2 = { workspace = true }
+parking_lot = { workspace = true }
# encode/decode
-bincode = { version = "2.0.0-rc.3", features = ["derive"] }
-base64 = "0.22.1"
-serde = { version = "1.0.203", features = ["derive"], optional = true }
+bincode = { workspace = true, features = ["derive"] }
+base64 = { workspace = true }
+serde = { workspace = true, features = ["derive"], optional = true }
# async
-async-trait = "0.1.80"
-async-channel = "2.3.1"
-futures-util = { version = "0.3.5", features = [
- "alloc",
-], default-features = false }
+async-trait = { workspace = true }
+async-channel = { workspace = true }
+futures-util = { workspace = true, features = ["alloc"] }
# tls
-rcgen = "0.13.1"
-yasna = "0.5.2"
-x509-parser = "0.16.0"
-futures-rustls = { version = "0.26.0", features = [
- "aws-lc-rs",
-], optional = true }
-tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true }
-rustls-pki-types = "1.7.0"
+rcgen = { workspace = true }
+yasna = { workspace = true }
+x509-parser = { workspace = true }
+futures-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true }
+tokio-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true }
+rustls-pki-types = { workspace = true }
[dev-dependencies]
+smol = { workspace = true }
blocking = "1.6.1"
clap = { version = "4.5.7", features = ["derive"] }
ctrlc = "3.4.4"
easy-parallel = "3.3.1"
env_logger = "0.11.3"
-smol = "2.0.0"
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index c1a7a8c..f2e9d1e 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -74,8 +74,7 @@ impl Connection {
pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
match self.listeners.get(&P::id()) {
Some(l) => l.recv().await.map_err(Error::from),
- // TODO
- None => todo!(),
+ None => Err(Error::UnsupportedProtocol(P::id())),
}
}
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index cfa661b..98cdfc7 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -148,6 +148,10 @@ impl Connector {
async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ClientTlsConfig {
tcp_config: Default::default(),
client_config: tls_client_config(&self.key_pair, peer_id.clone())?,
@@ -157,6 +161,10 @@ impl Connector {
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 47a1d09..71cc5b8 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -41,6 +41,9 @@ pub struct LookupService {
/// Resolved listen endpoint
listen_endpoint: RwLock<Option<Endpoint>>,
+ /// Resolved discovery endpoint
+ discovery_endpoint: RwLock<Option<Endpoint>>,
+
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -52,7 +55,6 @@ impl LookupService {
/// Creates a new lookup service
pub fn new(
key_pair: &KeyPair,
- id: &PeerID,
table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
@@ -78,13 +80,18 @@ impl LookupService {
ex,
);
+ let id = key_pair
+ .public()
+ .try_into()
+ .expect("Get PeerID from KeyPair");
Self {
- id: id.clone(),
+ id,
table,
listener,
connector,
outbound_slots,
listen_endpoint: RwLock::new(None),
+ discovery_endpoint: RwLock::new(None),
config,
monitor,
}
@@ -98,19 +105,23 @@ impl LookupService {
/// Set the resolved listen endpoint.
pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
- let resolved_endpoint = Endpoint::Tcp(
+ let discovery_endpoint = Endpoint::Tcp(
resolved_endpoint.addr()?.clone(),
self.config.discovery_port,
);
- *self.listen_endpoint.write() = Some(resolved_endpoint);
+ *self.listen_endpoint.write() = Some(resolved_endpoint.clone());
+ *self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
Ok(())
}
- /// Get the listening endpoint.
pub fn listen_endpoint(&self) -> Option<Endpoint> {
self.listen_endpoint.read().clone()
}
+ pub fn discovery_endpoint(&self) -> Option<Endpoint> {
+ self.discovery_endpoint.read().clone()
+ }
+
/// Shuts down the lookup service.
pub async fn shutdown(&self) {
self.connector.shutdown().await;
@@ -204,7 +215,7 @@ impl LookupService {
/// Starts a self lookup
async fn self_lookup(
&self,
- random_peers: &Vec<PeerMsg>,
+ random_peers: &[PeerMsg],
peer_buffer: &mut Vec<PeerMsg>,
) -> Result<()> {
let mut results = FuturesUnordered::new();
@@ -278,7 +289,7 @@ impl LookupService {
trace!("Send Peer msg");
if let Some(endpoint) = self.listen_endpoint() {
- self.send_peer_msg(&conn, endpoint.clone()).await?;
+ self.send_peer_msg(&conn, endpoint).await?;
}
trace!("Send Shutdown msg");
@@ -289,8 +300,8 @@ impl LookupService {
/// Start a listener.
async fn start_listener(self: &Arc<Self>) -> Result<()> {
- let endpoint: Endpoint = match self.listen_endpoint() {
- Some(e) => e.clone(),
+ let endpoint: Endpoint = match self.discovery_endpoint() {
+ Some(e) => e,
None => return Ok(()),
};
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index a81a817..b4c20bf 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -80,7 +80,6 @@ impl Discovery {
let lookup_service = Arc::new(LookupService::new(
key_pair,
- peer_id,
table.clone(),
config.clone(),
monitor.clone(),
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index a490b57..cc30aff 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -11,6 +11,9 @@ pub enum Error {
#[error("Unsupported protocol error: {0}")]
UnsupportedProtocol(String),
+ #[error("Unsupported Endpoint: {0}")]
+ UnsupportedEndpoint(String),
+
#[error("PeerID try from PublicKey Error")]
PeerIDTryFromPublicKey,
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 8a5deaa..347099d 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -157,6 +157,10 @@ impl Listener {
async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ServerTlsConfig {
tcp_config: Default::default(),
server_config: tls_server_config(&self.key_pair)?,
@@ -165,6 +169,10 @@ impl Listener {
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)