diff options
-rw-r--r-- | Cargo.lock | 12 | ||||
-rw-r--r-- | Cargo.toml | 56 | ||||
-rw-r--r-- | core/Cargo.toml | 31 | ||||
-rw-r--r-- | core/src/async_util/condvar.rs | 5 | ||||
-rw-r--r-- | core/src/async_util/task_group.rs | 2 | ||||
-rw-r--r-- | core/src/lib.rs | 1 | ||||
-rw-r--r-- | jsonrpc/Cargo.toml | 41 | ||||
-rw-r--r-- | jsonrpc/impl/Cargo.toml | 8 | ||||
-rw-r--r-- | jsonrpc/impl/src/lib.rs | 12 | ||||
-rw-r--r-- | jsonrpc/src/server/pubsub_service.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/server/service.rs | 2 | ||||
-rw-r--r-- | net/Cargo.toml | 48 | ||||
-rw-r--r-- | net/async_rustls/Cargo.toml | 5 | ||||
-rw-r--r-- | net/src/endpoint.rs | 148 | ||||
-rw-r--r-- | p2p/Cargo.toml | 54 | ||||
-rw-r--r-- | p2p/src/connection.rs | 3 | ||||
-rw-r--r-- | p2p/src/connector.rs | 8 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 29 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 1 | ||||
-rw-r--r-- | p2p/src/error.rs | 3 | ||||
-rw-r--r-- | p2p/src/listener.rs | 8 |
21 files changed, 284 insertions, 195 deletions
@@ -344,9 +344,9 @@ dependencies = [ [[package]] name = "async-tungstenite" -version = "0.26.2" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb786dab48e539c5f17b23bac20d812ac027c01732ed7c7b58850c69a684e46c" +checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b" dependencies = [ "async-std", "futures-io", @@ -1871,9 +1871,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" [[package]] name = "rustls-webpki" @@ -2209,9 +2209,9 @@ checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" [[package]] name = "tungstenite" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" dependencies = [ "byteorder", "bytes", @@ -2,7 +2,7 @@ resolver = "2" # Please ensure that each crate comes before any other crate that depends on it -members = ["core", "net", "p2p", "jsonrpc"] +members = ["core", "net", "net/async_rustls", "jsonrpc", "jsonrpc/impl", "p2p"] [workspace.package] description = "A library for building p2p, decentralized, and collaborative software" @@ -14,7 +14,53 @@ license = "GPL-3.0" authors = ["hozan23 <hozan23@karyontech.net>"] [workspace.dependencies] -karyon_core = { path = "core", default-features = false } -karyon_net = { path = "net", default-features = false } -karyon_jsonrpc = { path = "jsonrpc", default-features = false } -karyon_p2p = { path = "p2p", default-features = false } +karyon_core = { path = "core", version = "0.1.6", default-features = false } + +karyon_net = { path = "net", version = "0.1.6", default-features = false } +karyon_async_rustls = { path = "net/async_rustls", version = "0.1.6", default-features = false } + +karyon_jsonrpc = { path = "jsonrpc", version = "0.1.6", default-features = false } +karyon_jsonrpc_macro = { path = "jsonrpc/impl", version = "0.1.6", default-features = false } + +karyon_p2p = { path = "p2p", version = "0.1.6", default-features = false } + +log = "0.4" +thiserror = "1.0" +chrono = "0.4" +rand = "0.8" +url = "2.5" +parking_lot = "0.12" +once_cell = "1.19" +semver = "1.0" +sha2 = "0.10" + +# async +async-channel = "2.3" +async-trait = "0.1" +pin-project-lite = "0.2" +async-process = "2.2" +smol = "2.0" +tokio = "1.38" +futures-util = { version = "0.3", default-features = false } + +# encode +bincode = "2.0.0-rc.3" +serde = "1.0" +serde_json = "1.0" +base64 = "0.22" + +# macros +proc-macro2 = "1.0" +quote = "1.0" +syn = "2.0" + +# websocket +async-tungstenite = { version = "0.28", default-features = false } + +# tls +rustls-pki-types = "1.9" +futures-rustls = "0.26" +tokio-rustls = "0.26" +rcgen = "0.13" +yasna = "0.5" +x509-parser = "0.16" diff --git a/core/Cargo.toml b/core/Cargo.toml index 895ddf6..a43537b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,26 +15,23 @@ tokio = ["dep:tokio"] smol = ["dep:smol", "async-process"] [dependencies] -log = "0.4.21" -thiserror = "1.0.61" -chrono = "0.4.38" -rand = "0.8.5" +log = { workspace = true } +thiserror = { workspace = true } +chrono = { workspace = true } +rand = { workspace = true } +parking_lot = { workspace = true } dirs = "5.0.1" -parking_lot = "0.12.3" -once_cell = "1.19.0" +once_cell = { workspace = true } -# async -async-channel = "2.3.1" -pin-project-lite = "0.2.14" -async-process = { version = "2.2.3", optional = true } -smol = { version = "2.0.0", optional = true } -tokio = { version = "1.38.0", features = ["full"], optional = true } -futures-util = { version = "0.3.5", features = [ - "alloc", +async-channel = { workspace = true } +pin-project-lite = { workspace = true } +async-process = { workspace = true, optional = true } +smol = { workspace = true, optional = true } +tokio = { workspace = true, features = ["full"], optional = true } +futures-util = { workspace = true, features = [ + "alloc", ], default-features = false } -# encode -bincode = "2.0.0-rc.3" +bincode = { workspace = true } -# crypto ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true } diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs index e425eda..44705c6 100644 --- a/core/src/async_util/condvar.rs +++ b/core/src/async_util/condvar.rs @@ -58,7 +58,6 @@ use crate::{async_runtime::lock::MutexGuard, util::random_16}; /// }; /// /// ``` - pub struct CondVar { inner: Mutex<Wakers>, } @@ -116,7 +115,7 @@ impl<'a, T> CondVarAwait<'a, T> { } } -impl<'a, T> Future for CondVarAwait<'a, T> { +impl<T> Future for CondVarAwait<'_, T> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { @@ -155,7 +154,7 @@ impl<'a, T> Future for CondVarAwait<'a, T> { } } -impl<'a, T> Drop for CondVarAwait<'a, T> { +impl<T> Drop for CondVarAwait<'_, T> { fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.condvar.inner.lock(); diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs index c55b9a1..39bbf5c 100644 --- a/core/src/async_util/task_group.rs +++ b/core/src/async_util/task_group.rs @@ -126,7 +126,7 @@ pub struct TaskHandler { cancel_flag: Arc<CondWait>, } -impl<'a> TaskHandler { +impl TaskHandler { /// Creates a new task handler fn new<T, Fut, CallbackF, CallbackFut>( ex: Executor, diff --git a/core/src/lib.rs b/core/src/lib.rs index a7192d9..fedb226 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -23,7 +23,6 @@ pub mod pubsub; pub mod async_runtime; #[cfg(feature = "crypto")] - /// Collects common cryptographic tools pub mod crypto; diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 2260b7e..bdbcb74 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -17,39 +17,38 @@ tls = ["tcp", "karyon_net/tls"] ws = ["tcp", "karyon_net/ws", "async-tungstenite"] unix = ["karyon_net/unix"] smol = [ - "karyon_core/smol", - "karyon_net/smol", - "karyon_jsonrpc_macro/smol", - "async-tungstenite?/async-std-runtime", + "karyon_core/smol", + "karyon_net/smol", + "karyon_jsonrpc_macro/smol", + "async-tungstenite?/async-std-runtime", ] tokio = [ - "karyon_core/tokio", - "karyon_net/tokio", - "karyon_jsonrpc_macro/tokio", - "async-tungstenite?/tokio-runtime", + "karyon_core/tokio", + "karyon_net/tokio", + "karyon_jsonrpc_macro/tokio", + "async-tungstenite?/tokio-runtime", ] [dependencies] -karyon_core = { version = "0.1.6", path = "../core", default-features = false } -karyon_net = { version = "0.1.6", path = "../net", default-features = false } +karyon_core = { workspace = true } +karyon_net = { workspace = true } +karyon_jsonrpc_macro = { workspace = true } -karyon_jsonrpc_macro = { version = "0.1.6", path = "impl", default-features = false } - -log = "0.4.21" -rand = "0.8.5" -thiserror = "1.0.61" +log = { workspace = true } +rand = { workspace = true } +thiserror = { workspace = true } # encode/decode -serde = { version = "1.0.203", features = ["derive"] } -serde_json = "1.0.117" +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } # async -async-trait = "0.1.80" -async-channel = "2.3.1" +async-trait = { workspace = true } +async-channel = { workspace = true } # websocket -async-tungstenite = { version = "0.26.2", default-features = false, optional = true } +async-tungstenite = { workspace = true, optional = true } [dev-dependencies] +smol = { workspace = true } env_logger = "0.11.3" -smol = "2.0.0" diff --git a/jsonrpc/impl/Cargo.toml b/jsonrpc/impl/Cargo.toml index cf001e1..0bf7890 100644 --- a/jsonrpc/impl/Cargo.toml +++ b/jsonrpc/impl/Cargo.toml @@ -19,8 +19,8 @@ smol = [] tokio = [] [dependencies] -proc-macro2 = "1.0" -quote = "1.0" -syn = { version = "2.0", features = ["full"] } +proc-macro2 = { workspace = true } +quote = { workspace = true } +syn = { workspace = true, features = ["full"] } -serde_json = "1.0.117" +serde_json = { workspace = true } diff --git a/jsonrpc/impl/src/lib.rs b/jsonrpc/impl/src/lib.rs index 8814e61..d7ea466 100644 --- a/jsonrpc/impl/src/lib.rs +++ b/jsonrpc/impl/src/lib.rs @@ -54,9 +54,9 @@ pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { let item: TokenStream2 = item.into(); quote! { impl karyon_jsonrpc::RPCService for #self_ty { - fn get_method<'a>( - &'a self, - name: &'a str + fn get_method( + &self, + name: &str ) -> Option<karyon_jsonrpc::RPCMethod> { match name { #(#impl_methods)* @@ -115,9 +115,9 @@ pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { let item: TokenStream2 = item.into(); quote! { impl karyon_jsonrpc::PubSubRPCService for #self_ty { - fn get_pubsub_method<'a>( - &'a self, - name: &'a str + fn get_pubsub_method( + &self, + name: &str ) -> Option<karyon_jsonrpc::PubSubRPCMethod> { match name { #(#impl_methods)* diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index 909b0b0..0775880 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -12,6 +12,6 @@ type PubSubRPCMethodOutput<'a> = /// Defines the interface for an RPC service. pub trait PubSubRPCService: Sync + Send { - fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option<PubSubRPCMethod>; + fn get_pubsub_method(&self, name: &str) -> Option<PubSubRPCMethod>; fn name(&self) -> String; } diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs index 787da86..5195d2c 100644 --- a/jsonrpc/src/server/service.rs +++ b/jsonrpc/src/server/service.rs @@ -9,6 +9,6 @@ type RPCMethodOutput<'a> = /// Defines the interface for an RPC service. pub trait RPCService: Sync + Send { - fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>; + fn get_method(&self, name: &str) -> Option<RPCMethod>; fn name(&self) -> String; } diff --git a/net/Cargo.toml b/net/Cargo.toml index 01c7d3e..1a1a34d 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -20,44 +20,40 @@ ws = ["tcp", "async-tungstenite"] udp = [] unix = ["stream"] smol = [ - "karyon_core/smol", - "async-tungstenite?/async-std-runtime", - "karyon_async_rustls?/smol", + "karyon_core/smol", + "async-tungstenite?/async-std-runtime", + "karyon_async_rustls?/smol", ] tokio = [ - "karyon_core/tokio", - "async-tungstenite?/tokio-runtime", - "karyon_async_rustls?/tokio", - "dep:tokio", + "karyon_core/tokio", + "async-tungstenite?/tokio-runtime", + "karyon_async_rustls?/tokio", + "dep:tokio", ] serde = ["dep:serde"] [dependencies] -karyon_core = { version = "0.1.6", path = "../core", default-features = false } -karyon_async_rustls = { version = "0.1.6", path = "./async_rustls", default-features = false, optional = true } +karyon_core = { workspace = true } +karyon_async_rustls = { workspace = true, optional = true } -log = "0.4.21" -thiserror = "1.0.61" -url = "2.5.2" - -# encode/decode -serde = { version = "1.0.203", features = ["derive"], optional = true } -bincode = { version = "2.0.0-rc.3", features = ["derive"] } +log = { workspace = true } +thiserror = { workspace = true } +url = { workspace = true } +serde = { workspace = true, features = ["derive"], optional = true } +bincode = { workspace = true, features = ["derive"] } # async -async-trait = "0.1.80" -async-channel = "2.3.1" -futures-util = { version = "0.3.30", default-features = false, features = [ - "sink", -], optional = true } -pin-project-lite = { version = "0.2.14", optional = true } -tokio = { version = "1.38.0", features = ["io-util"], optional = true } +async-trait = { workspace = true } +async-channel = { workspace = true } +futures-util = { workspace = true, features = ["sink"], optional = true } +pin-project-lite = { workspace = true, optional = true } +tokio = { workspace = true, features = ["io-util"], optional = true } # websocket -async-tungstenite = { version = "0.26.2", default-features = false, optional = true } +async-tungstenite = { workspace = true, optional = true } # tls -rustls-pki-types = { version = "1.7.0", optional = true } +rustls-pki-types = { workspace = true, optional = true } [dev-dependencies] -smol = "2.0.0" +smol = { workspace = true } diff --git a/net/async_rustls/Cargo.toml b/net/async_rustls/Cargo.toml index 66ac2cc..871f02e 100644 --- a/net/async_rustls/Cargo.toml +++ b/net/async_rustls/Cargo.toml @@ -14,6 +14,5 @@ smol = ["futures-rustls"] tokio = ["tokio-rustls"] [dependencies] -futures-rustls = { version = "0.26.0", optional = true } -tokio-rustls = { version = "0.26.0", optional = true } - +futures-rustls = { workspace = true, optional = true } +tokio-rustls = { workspace = true, optional = true } diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs index c3626ec..dff703d 100644 --- a/net/src/endpoint.rs +++ b/net/src/endpoint.rs @@ -45,6 +45,98 @@ pub enum Endpoint { Unix(PathBuf), } +impl Endpoint { + /// Creates a new TCP endpoint from a `SocketAddr`. + pub fn new_tcp_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new UDP endpoint from a `SocketAddr`. + pub fn new_udp_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new TLS endpoint from a `SocketAddr`. + pub fn new_tls_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Tls(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new WS endpoint from a `SocketAddr`. + pub fn new_ws_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Ws(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new WSS endpoint from a `SocketAddr`. + pub fn new_wss_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Wss(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new Unix endpoint from a `UnixSocketAddr`. + pub fn new_unix_addr(addr: &std::path::Path) -> Endpoint { + Endpoint::Unix(addr.to_path_buf()) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Tcp`. + pub fn is_tcp(&self) -> bool { + matches!(self, Endpoint::Tcp(..)) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Tls`. + pub fn is_tls(&self) -> bool { + matches!(self, Endpoint::Tls(..)) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Ws`. + pub fn is_ws(&self) -> bool { + matches!(self, Endpoint::Ws(..)) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Wss`. + pub fn is_wss(&self) -> bool { + matches!(self, Endpoint::Wss(..)) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Udp`. + pub fn is_udp(&self) -> bool { + matches!(self, Endpoint::Udp(..)) + } + + #[inline] + /// Checks if the `Endpoint` is of type `Unix`. + pub fn is_unix(&self) -> bool { + matches!(self, Endpoint::Unix(..)) + } + + /// Returns the `Port` of the endpoint. + pub fn port(&self) -> Result<&Port> { + match self { + Endpoint::Tcp(_, port) + | Endpoint::Udp(_, port) + | Endpoint::Tls(_, port) + | Endpoint::Ws(_, port) + | Endpoint::Wss(_, port) => Ok(port), + _ => Err(Error::TryFromEndpoint), + } + } + + /// Returns the `Addr` of the endpoint. + pub fn addr(&self) -> Result<&Addr> { + match self { + Endpoint::Tcp(addr, _) + | Endpoint::Udp(addr, _) + | Endpoint::Tls(addr, _) + | Endpoint::Ws(addr, _) + | Endpoint::Wss(addr, _) => Ok(addr), + _ => Err(Error::TryFromEndpoint), + } + } +} + impl std::fmt::Display for Endpoint { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { @@ -152,62 +244,6 @@ impl FromStr for Endpoint { } } -impl Endpoint { - /// Creates a new TCP endpoint from a `SocketAddr`. - pub fn new_tcp_addr(addr: SocketAddr) -> Endpoint { - Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new UDP endpoint from a `SocketAddr`. - pub fn new_udp_addr(addr: SocketAddr) -> Endpoint { - Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new TLS endpoint from a `SocketAddr`. - pub fn new_tls_addr(addr: SocketAddr) -> Endpoint { - Endpoint::Tls(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new WS endpoint from a `SocketAddr`. - pub fn new_ws_addr(addr: SocketAddr) -> Endpoint { - Endpoint::Ws(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new WSS endpoint from a `SocketAddr`. - pub fn new_wss_addr(addr: SocketAddr) -> Endpoint { - Endpoint::Wss(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new Unix endpoint from a `UnixSocketAddr`. - pub fn new_unix_addr(addr: &std::path::Path) -> Endpoint { - Endpoint::Unix(addr.to_path_buf()) - } - - /// Returns the `Port` of the endpoint. - pub fn port(&self) -> Result<&Port> { - match self { - Endpoint::Tcp(_, port) - | Endpoint::Udp(_, port) - | Endpoint::Tls(_, port) - | Endpoint::Ws(_, port) - | Endpoint::Wss(_, port) => Ok(port), - _ => Err(Error::TryFromEndpoint), - } - } - - /// Returns the `Addr` of the endpoint. - pub fn addr(&self) -> Result<&Addr> { - match self { - Endpoint::Tcp(addr, _) - | Endpoint::Udp(addr, _) - | Endpoint::Tls(addr, _) - | Endpoint::Ws(addr, _) - | Endpoint::Wss(addr, _) => Ok(addr), - _ => Err(Error::TryFromEndpoint), - } - } -} - /// Addr defines a type for an address, either IP or domain. #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9c303e9..15f8cac 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -18,50 +18,40 @@ tokio = ["karyon_core/tokio", "karyon_net/tokio", "tokio-rustls"] serde = ["dep:serde", "karyon_net/serde"] [dependencies] -karyon_core = { workspace = true, features = [ - "crypto", -], default-features = false } -karyon_net = { workspace = true, default-features = false, features = [ - "tcp", - "tls", - "udp", -] } +karyon_core = { workspace = true, features = ["crypto"] } +karyon_net = { workspace = true, features = ["tcp", "tls", "udp"] } -log = "0.4.21" -chrono = "0.4.38" -rand = "0.8.5" -thiserror = "1.0.61" -semver = "1.0.23" -sha2 = "0.10.8" -parking_lot = "0.12.3" +log = { workspace = true } +chrono = { workspace = true } +rand = { workspace = true } +thiserror = { workspace = true } +semver = { workspace = true } +sha2 = { workspace = true } +parking_lot = { workspace = true } # encode/decode -bincode = { version = "2.0.0-rc.3", features = ["derive"] } -base64 = "0.22.1" -serde = { version = "1.0.203", features = ["derive"], optional = true } +bincode = { workspace = true, features = ["derive"] } +base64 = { workspace = true } +serde = { workspace = true, features = ["derive"], optional = true } # async -async-trait = "0.1.80" -async-channel = "2.3.1" -futures-util = { version = "0.3.5", features = [ - "alloc", -], default-features = false } +async-trait = { workspace = true } +async-channel = { workspace = true } +futures-util = { workspace = true, features = ["alloc"] } # tls -rcgen = "0.13.1" -yasna = "0.5.2" -x509-parser = "0.16.0" -futures-rustls = { version = "0.26.0", features = [ - "aws-lc-rs", -], optional = true } -tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true } -rustls-pki-types = "1.7.0" +rcgen = { workspace = true } +yasna = { workspace = true } +x509-parser = { workspace = true } +futures-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true } +tokio-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true } +rustls-pki-types = { workspace = true } [dev-dependencies] +smol = { workspace = true } blocking = "1.6.1" clap = { version = "4.5.7", features = ["derive"] } ctrlc = "3.4.4" easy-parallel = "3.3.1" env_logger = "0.11.3" -smol = "2.0.0" diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs index c1a7a8c..f2e9d1e 100644 --- a/p2p/src/connection.rs +++ b/p2p/src/connection.rs @@ -74,8 +74,7 @@ impl Connection { pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> { match self.listeners.get(&P::id()) { Some(l) => l.recv().await.map_err(Error::from), - // TODO - None => todo!(), + None => Err(Error::UnsupportedProtocol(P::id())), } } diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index cfa661b..98cdfc7 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -148,6 +148,10 @@ impl Connector { async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> { if self.enable_tls { + if !endpoint.is_tcp() && !endpoint.is_tls() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + let tls_config = tls::ClientTlsConfig { tcp_config: Default::default(), client_config: tls_client_config(&self.key_pair, peer_id.clone())?, @@ -157,6 +161,10 @@ impl Connector { .await .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) } else { + if !endpoint.is_tcp() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 47a1d09..71cc5b8 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -41,6 +41,9 @@ pub struct LookupService { /// Resolved listen endpoint listen_endpoint: RwLock<Option<Endpoint>>, + /// Resolved discovery endpoint + discovery_endpoint: RwLock<Option<Endpoint>>, + /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -52,7 +55,6 @@ impl LookupService { /// Creates a new lookup service pub fn new( key_pair: &KeyPair, - id: &PeerID, table: Arc<RoutingTable>, config: Arc<Config>, monitor: Arc<Monitor>, @@ -78,13 +80,18 @@ impl LookupService { ex, ); + let id = key_pair + .public() + .try_into() + .expect("Get PeerID from KeyPair"); Self { - id: id.clone(), + id, table, listener, connector, outbound_slots, listen_endpoint: RwLock::new(None), + discovery_endpoint: RwLock::new(None), config, monitor, } @@ -98,19 +105,23 @@ impl LookupService { /// Set the resolved listen endpoint. pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> { - let resolved_endpoint = Endpoint::Tcp( + let discovery_endpoint = Endpoint::Tcp( resolved_endpoint.addr()?.clone(), self.config.discovery_port, ); - *self.listen_endpoint.write() = Some(resolved_endpoint); + *self.listen_endpoint.write() = Some(resolved_endpoint.clone()); + *self.discovery_endpoint.write() = Some(discovery_endpoint.clone()); Ok(()) } - /// Get the listening endpoint. pub fn listen_endpoint(&self) -> Option<Endpoint> { self.listen_endpoint.read().clone() } + pub fn discovery_endpoint(&self) -> Option<Endpoint> { + self.discovery_endpoint.read().clone() + } + /// Shuts down the lookup service. pub async fn shutdown(&self) { self.connector.shutdown().await; @@ -204,7 +215,7 @@ impl LookupService { /// Starts a self lookup async fn self_lookup( &self, - random_peers: &Vec<PeerMsg>, + random_peers: &[PeerMsg], peer_buffer: &mut Vec<PeerMsg>, ) -> Result<()> { let mut results = FuturesUnordered::new(); @@ -278,7 +289,7 @@ impl LookupService { trace!("Send Peer msg"); if let Some(endpoint) = self.listen_endpoint() { - self.send_peer_msg(&conn, endpoint.clone()).await?; + self.send_peer_msg(&conn, endpoint).await?; } trace!("Send Shutdown msg"); @@ -289,8 +300,8 @@ impl LookupService { /// Start a listener. async fn start_listener(self: &Arc<Self>) -> Result<()> { - let endpoint: Endpoint = match self.listen_endpoint() { - Some(e) => e.clone(), + let endpoint: Endpoint = match self.discovery_endpoint() { + Some(e) => e, None => return Ok(()), }; diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index a81a817..b4c20bf 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -80,7 +80,6 @@ impl Discovery { let lookup_service = Arc::new(LookupService::new( key_pair, - peer_id, table.clone(), config.clone(), monitor.clone(), diff --git a/p2p/src/error.rs b/p2p/src/error.rs index a490b57..cc30aff 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("Unsupported protocol error: {0}")] UnsupportedProtocol(String), + #[error("Unsupported Endpoint: {0}")] + UnsupportedEndpoint(String), + #[error("PeerID try from PublicKey Error")] PeerIDTryFromPublicKey, diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 8a5deaa..347099d 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -157,6 +157,10 @@ impl Listener { async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> { if self.enable_tls { + if !endpoint.is_tcp() && !endpoint.is_tls() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + let tls_config = tls::ServerTlsConfig { tcp_config: Default::default(), server_config: tls_server_config(&self.key_pair)?, @@ -165,6 +169,10 @@ impl Listener { .await .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) } else { + if !endpoint.is_tcp() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) |