diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/Cargo.toml | 54 | ||||
-rw-r--r-- | p2p/src/connection.rs | 3 | ||||
-rw-r--r-- | p2p/src/connector.rs | 8 | ||||
-rw-r--r-- | p2p/src/discovery/lookup.rs | 29 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 1 | ||||
-rw-r--r-- | p2p/src/error.rs | 3 | ||||
-rw-r--r-- | p2p/src/listener.rs | 8 |
7 files changed, 62 insertions, 44 deletions
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9c303e9..15f8cac 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -18,50 +18,40 @@ tokio = ["karyon_core/tokio", "karyon_net/tokio", "tokio-rustls"] serde = ["dep:serde", "karyon_net/serde"] [dependencies] -karyon_core = { workspace = true, features = [ - "crypto", -], default-features = false } -karyon_net = { workspace = true, default-features = false, features = [ - "tcp", - "tls", - "udp", -] } +karyon_core = { workspace = true, features = ["crypto"] } +karyon_net = { workspace = true, features = ["tcp", "tls", "udp"] } -log = "0.4.21" -chrono = "0.4.38" -rand = "0.8.5" -thiserror = "1.0.61" -semver = "1.0.23" -sha2 = "0.10.8" -parking_lot = "0.12.3" +log = { workspace = true } +chrono = { workspace = true } +rand = { workspace = true } +thiserror = { workspace = true } +semver = { workspace = true } +sha2 = { workspace = true } +parking_lot = { workspace = true } # encode/decode -bincode = { version = "2.0.0-rc.3", features = ["derive"] } -base64 = "0.22.1" -serde = { version = "1.0.203", features = ["derive"], optional = true } +bincode = { workspace = true, features = ["derive"] } +base64 = { workspace = true } +serde = { workspace = true, features = ["derive"], optional = true } # async -async-trait = "0.1.80" -async-channel = "2.3.1" -futures-util = { version = "0.3.5", features = [ - "alloc", -], default-features = false } +async-trait = { workspace = true } +async-channel = { workspace = true } +futures-util = { workspace = true, features = ["alloc"] } # tls -rcgen = "0.13.1" -yasna = "0.5.2" -x509-parser = "0.16.0" -futures-rustls = { version = "0.26.0", features = [ - "aws-lc-rs", -], optional = true } -tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true } -rustls-pki-types = "1.7.0" +rcgen = { workspace = true } +yasna = { workspace = true } +x509-parser = { workspace = true } +futures-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true } +tokio-rustls = { workspace = true, features = ["aws-lc-rs"], optional = true } +rustls-pki-types = { workspace = true } [dev-dependencies] +smol = { workspace = true } blocking = "1.6.1" clap = { version = "4.5.7", features = ["derive"] } ctrlc = "3.4.4" easy-parallel = "3.3.1" env_logger = "0.11.3" -smol = "2.0.0" diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs index c1a7a8c..f2e9d1e 100644 --- a/p2p/src/connection.rs +++ b/p2p/src/connection.rs @@ -74,8 +74,7 @@ impl Connection { pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> { match self.listeners.get(&P::id()) { Some(l) => l.recv().await.map_err(Error::from), - // TODO - None => todo!(), + None => Err(Error::UnsupportedProtocol(P::id())), } } diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index cfa661b..98cdfc7 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -148,6 +148,10 @@ impl Connector { async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> { if self.enable_tls { + if !endpoint.is_tcp() && !endpoint.is_tls() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + let tls_config = tls::ClientTlsConfig { tcp_config: Default::default(), client_config: tls_client_config(&self.key_pair, peer_id.clone())?, @@ -157,6 +161,10 @@ impl Connector { .await .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) } else { + if !endpoint.is_tcp() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index 47a1d09..71cc5b8 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -41,6 +41,9 @@ pub struct LookupService { /// Resolved listen endpoint listen_endpoint: RwLock<Option<Endpoint>>, + /// Resolved discovery endpoint + discovery_endpoint: RwLock<Option<Endpoint>>, + /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -52,7 +55,6 @@ impl LookupService { /// Creates a new lookup service pub fn new( key_pair: &KeyPair, - id: &PeerID, table: Arc<RoutingTable>, config: Arc<Config>, monitor: Arc<Monitor>, @@ -78,13 +80,18 @@ impl LookupService { ex, ); + let id = key_pair + .public() + .try_into() + .expect("Get PeerID from KeyPair"); Self { - id: id.clone(), + id, table, listener, connector, outbound_slots, listen_endpoint: RwLock::new(None), + discovery_endpoint: RwLock::new(None), config, monitor, } @@ -98,19 +105,23 @@ impl LookupService { /// Set the resolved listen endpoint. pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> { - let resolved_endpoint = Endpoint::Tcp( + let discovery_endpoint = Endpoint::Tcp( resolved_endpoint.addr()?.clone(), self.config.discovery_port, ); - *self.listen_endpoint.write() = Some(resolved_endpoint); + *self.listen_endpoint.write() = Some(resolved_endpoint.clone()); + *self.discovery_endpoint.write() = Some(discovery_endpoint.clone()); Ok(()) } - /// Get the listening endpoint. pub fn listen_endpoint(&self) -> Option<Endpoint> { self.listen_endpoint.read().clone() } + pub fn discovery_endpoint(&self) -> Option<Endpoint> { + self.discovery_endpoint.read().clone() + } + /// Shuts down the lookup service. pub async fn shutdown(&self) { self.connector.shutdown().await; @@ -204,7 +215,7 @@ impl LookupService { /// Starts a self lookup async fn self_lookup( &self, - random_peers: &Vec<PeerMsg>, + random_peers: &[PeerMsg], peer_buffer: &mut Vec<PeerMsg>, ) -> Result<()> { let mut results = FuturesUnordered::new(); @@ -278,7 +289,7 @@ impl LookupService { trace!("Send Peer msg"); if let Some(endpoint) = self.listen_endpoint() { - self.send_peer_msg(&conn, endpoint.clone()).await?; + self.send_peer_msg(&conn, endpoint).await?; } trace!("Send Shutdown msg"); @@ -289,8 +300,8 @@ impl LookupService { /// Start a listener. async fn start_listener(self: &Arc<Self>) -> Result<()> { - let endpoint: Endpoint = match self.listen_endpoint() { - Some(e) => e.clone(), + let endpoint: Endpoint = match self.discovery_endpoint() { + Some(e) => e, None => return Ok(()), }; diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index a81a817..b4c20bf 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -80,7 +80,6 @@ impl Discovery { let lookup_service = Arc::new(LookupService::new( key_pair, - peer_id, table.clone(), config.clone(), monitor.clone(), diff --git a/p2p/src/error.rs b/p2p/src/error.rs index a490b57..cc30aff 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("Unsupported protocol error: {0}")] UnsupportedProtocol(String), + #[error("Unsupported Endpoint: {0}")] + UnsupportedEndpoint(String), + #[error("PeerID try from PublicKey Error")] PeerIDTryFromPublicKey, diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 8a5deaa..347099d 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -157,6 +157,10 @@ impl Listener { async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> { if self.enable_tls { + if !endpoint.is_tcp() && !endpoint.is_tls() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + let tls_config = tls::ServerTlsConfig { tcp_config: Default::default(), server_config: tls_server_config(&self.key_pair)?, @@ -165,6 +169,10 @@ impl Listener { .await .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) } else { + if !endpoint.is_tcp() { + return Err(Error::UnsupportedEndpoint(endpoint.to_string())); + } + tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) |