aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/Cargo.toml54
-rw-r--r--p2p/src/connection.rs3
-rw-r--r--p2p/src/connector.rs8
-rw-r--r--p2p/src/discovery/lookup.rs29
-rw-r--r--p2p/src/discovery/mod.rs1
-rw-r--r--p2p/src/error.rs3
-rw-r--r--p2p/src/listener.rs8
7 files changed, 62 insertions, 44 deletions
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index 9c303e9..15f8cac 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -18,50 +18,40 @@ tokio = ["karyon_core/tokio", "karyon_net/tokio", "tokio-rustls"]
serde = ["dep:serde", "karyon_net/serde"]
[dependencies]
-karyon_core = { workspace = true, features = [
- "crypto",
-], default-features = false }
-karyon_net = { workspace = true, default-features = false, features = [
- "tcp",
- "tls",
- "udp",
-] }
+karyon_core = { workspace = true, features = ["crypto"] }
+karyon_net = { workspace = true, features = ["tcp", "tls", "udp"] }
-log = "0.4.21"
-chrono = "0.4.38"
-rand = "0.8.5"
-thiserror = "1.0.61"
-semver = "1.0.23"
-sha2 = "0.10.8"
-parking_lot = "0.12.3"
+log = { workspace = true }
+chrono = { workspace = true }
+rand = { workspace = true }
+thiserror = { workspace = true }
+semver = { workspace = true }
+sha2 = { workspace = true }
+parking_lot = { workspace = true }
# encode/decode
-bincode = { version = "2.0.0-rc.3", features = ["derive"] }
-base64 = "0.22.1"
-serde = { version = "1.0.203", features = ["derive"], optional = true }
+bincode = { workspace = true, features = ["derive"] }
+base64 = { workspace = true }
+serde = { workspace = true, features = ["derive"], optional = true }
# async
-async-trait = "0.1.80"
-async-channel = "2.3.1"
-futures-util = { version = "0.3.5", features = [
- "alloc",
-], default-features = false }
+async-trait = { workspace = true }
+async-channel = { workspace = true }
+futures-util = { workspace = true, features = ["alloc"] }
# tls
-rcgen = "0.13.1"
-yasna = "0.5.2"
-x509-parser = "0.16.0"
-futures-rustls = { version = "0.26.0", features = [
- "aws-lc-rs",
-], optional = true }
-tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true }
-rustls-pki-types = "1.7.0"
+rcgen = { workspace = true }
+yasna = { workspace = true }
+x509-parser = { workspace = true }
+futures-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true }
+tokio-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true }
+rustls-pki-types = { workspace = true }
[dev-dependencies]
+smol = { workspace = true }
blocking = "1.6.1"
clap = { version = "4.5.7", features = ["derive"] }
ctrlc = "3.4.4"
easy-parallel = "3.3.1"
env_logger = "0.11.3"
-smol = "2.0.0"
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index c1a7a8c..f2e9d1e 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -74,8 +74,7 @@ impl Connection {
pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
match self.listeners.get(&P::id()) {
Some(l) => l.recv().await.map_err(Error::from),
- // TODO
- None => todo!(),
+ None => Err(Error::UnsupportedProtocol(P::id())),
}
}
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index cfa661b..98cdfc7 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -148,6 +148,10 @@ impl Connector {
async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ClientTlsConfig {
tcp_config: Default::default(),
client_config: tls_client_config(&self.key_pair, peer_id.clone())?,
@@ -157,6 +161,10 @@ impl Connector {
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 47a1d09..71cc5b8 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -41,6 +41,9 @@ pub struct LookupService {
/// Resolved listen endpoint
listen_endpoint: RwLock<Option<Endpoint>>,
+ /// Resolved discovery endpoint
+ discovery_endpoint: RwLock<Option<Endpoint>>,
+
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -52,7 +55,6 @@ impl LookupService {
/// Creates a new lookup service
pub fn new(
key_pair: &KeyPair,
- id: &PeerID,
table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
@@ -78,13 +80,18 @@ impl LookupService {
ex,
);
+ let id = key_pair
+ .public()
+ .try_into()
+ .expect("Get PeerID from KeyPair");
Self {
- id: id.clone(),
+ id,
table,
listener,
connector,
outbound_slots,
listen_endpoint: RwLock::new(None),
+ discovery_endpoint: RwLock::new(None),
config,
monitor,
}
@@ -98,19 +105,23 @@ impl LookupService {
/// Set the resolved listen endpoint.
pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
- let resolved_endpoint = Endpoint::Tcp(
+ let discovery_endpoint = Endpoint::Tcp(
resolved_endpoint.addr()?.clone(),
self.config.discovery_port,
);
- *self.listen_endpoint.write() = Some(resolved_endpoint);
+ *self.listen_endpoint.write() = Some(resolved_endpoint.clone());
+ *self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
Ok(())
}
- /// Get the listening endpoint.
pub fn listen_endpoint(&self) -> Option<Endpoint> {
self.listen_endpoint.read().clone()
}
+ pub fn discovery_endpoint(&self) -> Option<Endpoint> {
+ self.discovery_endpoint.read().clone()
+ }
+
/// Shuts down the lookup service.
pub async fn shutdown(&self) {
self.connector.shutdown().await;
@@ -204,7 +215,7 @@ impl LookupService {
/// Starts a self lookup
async fn self_lookup(
&self,
- random_peers: &Vec<PeerMsg>,
+ random_peers: &[PeerMsg],
peer_buffer: &mut Vec<PeerMsg>,
) -> Result<()> {
let mut results = FuturesUnordered::new();
@@ -278,7 +289,7 @@ impl LookupService {
trace!("Send Peer msg");
if let Some(endpoint) = self.listen_endpoint() {
- self.send_peer_msg(&conn, endpoint.clone()).await?;
+ self.send_peer_msg(&conn, endpoint).await?;
}
trace!("Send Shutdown msg");
@@ -289,8 +300,8 @@ impl LookupService {
/// Start a listener.
async fn start_listener(self: &Arc<Self>) -> Result<()> {
- let endpoint: Endpoint = match self.listen_endpoint() {
- Some(e) => e.clone(),
+ let endpoint: Endpoint = match self.discovery_endpoint() {
+ Some(e) => e,
None => return Ok(()),
};
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index a81a817..b4c20bf 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -80,7 +80,6 @@ impl Discovery {
let lookup_service = Arc::new(LookupService::new(
key_pair,
- peer_id,
table.clone(),
config.clone(),
monitor.clone(),
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index a490b57..cc30aff 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -11,6 +11,9 @@ pub enum Error {
#[error("Unsupported protocol error: {0}")]
UnsupportedProtocol(String),
+ #[error("Unsupported Endpoint: {0}")]
+ UnsupportedEndpoint(String),
+
#[error("PeerID try from PublicKey Error")]
PeerIDTryFromPublicKey,
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 8a5deaa..347099d 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -157,6 +157,10 @@ impl Listener {
async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ServerTlsConfig {
tcp_config: Default::default(),
server_config: tls_server_config(&self.key_pair)?,
@@ -165,6 +169,10 @@ impl Listener {
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)