aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock9
-rw-r--r--core/src/event.rs10
-rw-r--r--p2p/Cargo.toml1
-rw-r--r--p2p/examples/monitor.rs183
-rw-r--r--p2p/src/discovery/lookup.rs6
-rw-r--r--p2p/src/discovery/refresh.rs16
-rw-r--r--p2p/src/monitor.rs2
7 files changed, 193 insertions, 34 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 00de6a8..81f0311 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1326,6 +1326,7 @@ dependencies = [
"rcgen 0.12.1",
"rustls-pki-types",
"semver",
+ "serde",
"serde_json",
"sha2",
"smol",
@@ -1961,18 +1962,18 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
-version = "1.0.201"
+version = "1.0.202"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c"
+checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.201"
+version = "1.0.202"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865"
+checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838"
dependencies = [
"proc-macro2",
"quote",
diff --git a/core/src/event.rs b/core/src/event.rs
index 876fda5..a32677b 100644
--- a/core/src/event.rs
+++ b/core/src/event.rs
@@ -243,9 +243,9 @@ where
/// Cancels the listener and removes it from the `EventSys`.
pub async fn cancel(&self) {
- self.event_sys()
- .remove(&self.topic, &self.event_id, &self.id)
- .await;
+ if let Some(es) = self.event_sys.upgrade() {
+ es.remove(&self.topic, &self.event_id, &self.id).await;
+ }
}
/// Returns the topic for this event listener.
@@ -257,10 +257,6 @@ where
pub async fn event_id(&self) -> &String {
&self.event_id
}
-
- fn event_sys(&self) -> ArcEventSys<T> {
- self.event_sys.upgrade().unwrap()
- }
}
/// An event within the [`EventSys`].
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index 83df9c0..22236e3 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -58,3 +58,4 @@ env_logger = "0.11.3"
smol = "2.0.0"
karyon_jsonrpc = { workspace = true, features = ["ws", "smol"] }
serde_json = "1.0.117"
+serde = { version = "1.0.202", features = ["derive"] }
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index 5382781..1629207 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -1,14 +1,16 @@
mod shared;
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
use clap::Parser;
use log::error;
+use serde::{Deserialize, Serialize};
use smol::{channel, Executor};
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
+ monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent},
ArcBackend, Backend, Config,
};
@@ -81,8 +83,57 @@ impl MonitorRPC {
let conn_events = self.backend.monitor().conn_events().await;
smol::spawn(async move {
loop {
- let _event = conn_events.recv().await;
- if let Err(err) = sub.notify(serde_json::json!("event")).await {
+ let event = conn_events.recv().await.unwrap();
+ let event: ConnEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn peer_pool_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let peer_pool_events = self.backend.monitor().peer_pool_events().await;
+ smol::spawn(async move {
+ loop {
+ let event = peer_pool_events.recv().await.unwrap();
+ let event: PeerPoolEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn discovery_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let discovery_events = self.backend.monitor().discovery_events().await;
+ smol::spawn(async move {
+ loop {
+ let event = discovery_events.recv().await.unwrap();
+ let event: DiscoveryEventJson = event.into();
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
error!("Failed to notify: {err}");
break;
}
@@ -103,6 +154,28 @@ impl MonitorRPC {
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
}
+
+ async fn peer_pool_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ async fn discovery_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
}
fn main() {
@@ -148,12 +221,12 @@ fn main() {
.await
.expect("Build rpc server");
- // Run the backend
- backend.run().await.expect("Run p2p backend");
-
// Run the RPC server
server.start().await;
+ // Run the backend
+ backend.run().await.expect("Run p2p backend");
+
// Wait for ctrlc signal
ctrlc_r.recv().await.expect("Wait for ctrlc signal");
@@ -166,3 +239,101 @@ fn main() {
ex,
);
}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct ConnEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ endpoint: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct PeerPoolEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ peer_id: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct DiscoveryEventJson {
+ name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ endpoint: Option<String>,
+}
+
+impl From<ConnEvent> for ConnEventJson {
+ fn from(item: ConnEvent) -> Self {
+ match item {
+ ConnEvent::Connected(ref e) => ConnEventJson {
+ name: "Connected".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::Disconnected(e) => ConnEventJson {
+ name: "Disconnected".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ConnectFailed(e) => ConnEventJson {
+ name: "ConnectFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ConnectRetried(e) => ConnEventJson {
+ name: "ConnectRetried".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::Accepted(e) => ConnEventJson {
+ name: "Accepted".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::AcceptFailed => ConnEventJson {
+ name: "AcceptFailed".into(),
+ endpoint: None,
+ },
+ ConnEvent::Listening(e) => ConnEventJson {
+ name: "Listening".into(),
+ endpoint: Some(e.to_string()),
+ },
+ ConnEvent::ListenFailed(e) => ConnEventJson {
+ name: "ListenFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ }
+ }
+}
+
+impl From<PeerPoolEvent> for PeerPoolEventJson {
+ fn from(item: PeerPoolEvent) -> Self {
+ match item {
+ PeerPoolEvent::NewPeer(id) => PeerPoolEventJson {
+ name: "NewPeer".into(),
+ peer_id: Some(id.to_string()),
+ },
+ PeerPoolEvent::RemovePeer(id) => PeerPoolEventJson {
+ name: "RemovePeer".into(),
+ peer_id: Some(id.to_string()),
+ },
+ }
+ }
+}
+
+impl From<DiscoveryEvent> for DiscoveryEventJson {
+ fn from(item: DiscoveryEvent) -> Self {
+ match item {
+ DiscoveryEvent::RefreshStarted => DiscoveryEventJson {
+ name: "RefreshStarted".into(),
+ endpoint: None,
+ },
+ DiscoveryEvent::LookupStarted(e) => DiscoveryEventJson {
+ name: "LookupStarted".into(),
+ endpoint: Some(e.to_string()),
+ },
+ DiscoveryEvent::LookupFailed(e) => DiscoveryEventJson {
+ name: "LookupFailed".into(),
+ endpoint: Some(e.to_string()),
+ },
+ DiscoveryEvent::LookupSucceeded(e, _) => DiscoveryEventJson {
+ name: "LookupSucceeded".into(),
+ endpoint: Some(e.to_string()),
+ },
+ }
+ }
+}
diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs
index 613c3cd..4a06083 100644
--- a/p2p/src/discovery/lookup.rs
+++ b/p2p/src/discovery/lookup.rs
@@ -234,14 +234,12 @@ impl LookupService {
) -> Result<Vec<PeerMsg>> {
let conn = self.connector.connect(&endpoint, &peer_id).await?;
self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::Connected(endpoint.clone())))
+ .notify(ConnEvent::Connected(endpoint.clone()))
.await;
let result = self.handle_outbound(conn, target_peer_id).await;
- self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))
- .await;
+ self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
self.outbound_slots.remove().await;
result
diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs
index 430e749..745a5d5 100644
--- a/p2p/src/discovery/refresh.rs
+++ b/p2p/src/discovery/refresh.rs
@@ -211,15 +211,13 @@ impl RefreshService {
let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await {
Ok(c) => {
self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::Listening(endpoint.clone())))
+ .notify(ConnEvent::Listening(endpoint.clone()))
.await;
c
}
Err(err) => {
self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::ListenFailed(
- endpoint.clone(),
- )))
+ .notify(ConnEvent::ListenFailed(endpoint.clone()))
.await;
return Err(err.into());
}
@@ -230,9 +228,7 @@ impl RefreshService {
let res = self.listen_to_ping_msg(&conn).await;
if let Err(err) = res {
trace!("Failed to handle ping msg {err}");
- self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::AcceptFailed))
- .await;
+ self.monitor.notify(ConnEvent::AcceptFailed).await;
}
}
}
@@ -241,7 +237,7 @@ impl RefreshService {
async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> {
let (msg, endpoint) = conn.recv().await?;
self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::Accepted(endpoint.clone())))
+ .notify(ConnEvent::Accepted(endpoint.clone()))
.await;
match msg {
@@ -252,9 +248,7 @@ impl RefreshService {
RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())),
}
- self.monitor
- .notify(DiscoveryEvent::Conn(ConnEvent::Disconnected(endpoint)))
- .await;
+ self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
Ok(())
}
diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs
index 945c6aa..b7afb7c 100644
--- a/p2p/src/monitor.rs
+++ b/p2p/src/monitor.rs
@@ -107,7 +107,6 @@ pub enum PeerPoolEvent {
#[derive(Clone, Debug)]
pub enum DiscoveryEvent {
LookupStarted(Endpoint),
- Conn(ConnEvent),
LookupFailed(Endpoint),
LookupSucceeded(Endpoint, usize),
RefreshStarted,
@@ -143,7 +142,6 @@ impl fmt::Display for DiscoveryEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let val = match self {
DiscoveryEvent::LookupStarted(endpoint) => format!("LookupStarted: {endpoint}"),
- DiscoveryEvent::Conn(event) => format!("Connection event: {event}"),
DiscoveryEvent::LookupFailed(endpoint) => format!("LookupFailed: {endpoint}"),
DiscoveryEvent::LookupSucceeded(endpoint, len) => {
format!("LookupSucceeded: {endpoint} {len}")