aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-16 08:16:57 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-16 08:19:34 +0200
commitcae0c15d10235bf0ec0bd6f8b20814dc7b63dfd5 (patch)
treea0724e160cdc5c556d132b07639c0225226b761a /p2p
parent6795c2a8c8a580575d107f596961e221faad69cf (diff)
p2p: check for the endpoints before listen/connect to them
Diffstat (limited to 'p2p')
-rw-r--r--p2p/src/connection.rs3
-rw-r--r--p2p/src/connector.rs8
-rw-r--r--p2p/src/discovery/lookup.rs27
-rw-r--r--p2p/src/discovery/mod.rs1
-rw-r--r--p2p/src/error.rs3
-rw-r--r--p2p/src/listener.rs8
6 files changed, 39 insertions, 11 deletions
diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs
index c1a7a8c..f2e9d1e 100644
--- a/p2p/src/connection.rs
+++ b/p2p/src/connection.rs
@@ -74,8 +74,7 @@ impl Connection {
pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
match self.listeners.get(&P::id()) {
Some(l) => l.recv().await.map_err(Error::from),
- // TODO
- None => todo!(),
+ None => Err(Error::UnsupportedProtocol(P::id())),
}
}
diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs
index cfa661b..98cdfc7 100644
--- a/p2p/src/connector.rs
+++ b/p2p/src/connector.rs
@@ -148,6 +148,10 @@ impl Connector {
async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ClientTlsConfig {
tcp_config: Default::default(),
client_config: tls_client_config(&self.key_pair, peer_id.clone())?,
@@ -157,6 +161,10 @@ impl Connector {
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 47a1d09..ba15da3 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -41,6 +41,9 @@ pub struct LookupService {
/// Resolved listen endpoint
listen_endpoint: RwLock<Option<Endpoint>>,
+ /// Resolved discovery endpoint
+ discovery_endpoint: RwLock<Option<Endpoint>>,
+
/// Holds the configuration for the P2P network.
config: Arc<Config>,
@@ -52,7 +55,6 @@ impl LookupService {
/// Creates a new lookup service
pub fn new(
key_pair: &KeyPair,
- id: &PeerID,
table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
@@ -78,13 +80,18 @@ impl LookupService {
ex,
);
+ let id = key_pair
+ .public()
+ .try_into()
+ .expect("Get PeerID from KeyPair");
Self {
- id: id.clone(),
+ id,
table,
listener,
connector,
outbound_slots,
listen_endpoint: RwLock::new(None),
+ discovery_endpoint: RwLock::new(None),
config,
monitor,
}
@@ -98,19 +105,23 @@ impl LookupService {
/// Set the resolved listen endpoint.
pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
- let resolved_endpoint = Endpoint::Tcp(
+ let discovery_endpoint = Endpoint::Tcp(
resolved_endpoint.addr()?.clone(),
self.config.discovery_port,
);
- *self.listen_endpoint.write() = Some(resolved_endpoint);
+ *self.listen_endpoint.write() = Some(resolved_endpoint.clone());
+ *self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
Ok(())
}
- /// Get the listening endpoint.
pub fn listen_endpoint(&self) -> Option<Endpoint> {
self.listen_endpoint.read().clone()
}
+ pub fn discovery_endpoint(&self) -> Option<Endpoint> {
+ self.discovery_endpoint.read().clone()
+ }
+
/// Shuts down the lookup service.
pub async fn shutdown(&self) {
self.connector.shutdown().await;
@@ -278,7 +289,7 @@ impl LookupService {
trace!("Send Peer msg");
if let Some(endpoint) = self.listen_endpoint() {
- self.send_peer_msg(&conn, endpoint.clone()).await?;
+ self.send_peer_msg(&conn, endpoint).await?;
}
trace!("Send Shutdown msg");
@@ -289,8 +300,8 @@ impl LookupService {
/// Start a listener.
async fn start_listener(self: &Arc<Self>) -> Result<()> {
- let endpoint: Endpoint = match self.listen_endpoint() {
- Some(e) => e.clone(),
+ let endpoint: Endpoint = match self.discovery_endpoint() {
+ Some(e) => e,
None => return Ok(()),
};
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index a81a817..b4c20bf 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -80,7 +80,6 @@ impl Discovery {
let lookup_service = Arc::new(LookupService::new(
key_pair,
- peer_id,
table.clone(),
config.clone(),
monitor.clone(),
diff --git a/p2p/src/error.rs b/p2p/src/error.rs
index a490b57..cc30aff 100644
--- a/p2p/src/error.rs
+++ b/p2p/src/error.rs
@@ -11,6 +11,9 @@ pub enum Error {
#[error("Unsupported protocol error: {0}")]
UnsupportedProtocol(String),
+ #[error("Unsupported Endpoint: {0}")]
+ UnsupportedEndpoint(String),
+
#[error("PeerID try from PublicKey Error")]
PeerIDTryFromPublicKey,
diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs
index 8a5deaa..347099d 100644
--- a/p2p/src/listener.rs
+++ b/p2p/src/listener.rs
@@ -157,6 +157,10 @@ impl Listener {
async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> {
if self.enable_tls {
+ if !endpoint.is_tcp() && !endpoint.is_tls() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
let tls_config = tls::ServerTlsConfig {
tcp_config: Default::default(),
server_config: tls_server_config(&self.key_pair)?,
@@ -165,6 +169,10 @@ impl Listener {
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
} else {
+ if !endpoint.is_tcp() {
+ return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
+ }
+
tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)