aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--net/src/codec/websocket.rs2
-rw-r--r--net/src/endpoint.rs6
-rw-r--r--net/src/stream/websocket.rs5
-rw-r--r--p2p/Cargo.toml4
-rw-r--r--p2p/examples/monitor.rs120
-rw-r--r--p2p/src/backend.rs11
7 files changed, 117 insertions, 33 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 708e50a..00de6a8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1319,12 +1319,14 @@ dependencies = [
"futures-rustls",
"futures-util",
"karyon_core",
+ "karyon_jsonrpc",
"karyon_net",
"log",
"rand",
"rcgen 0.12.1",
"rustls-pki-types",
"semver",
+ "serde_json",
"sha2",
"smol",
"thiserror",
diff --git a/net/src/codec/websocket.rs b/net/src/codec/websocket.rs
index b59a55c..8676810 100644
--- a/net/src/codec/websocket.rs
+++ b/net/src/codec/websocket.rs
@@ -19,5 +19,5 @@ pub trait WebSocketEncoder {
pub trait WebSocketDecoder {
type DeItem;
- fn decode(&self, src: &Message) -> Result<Self::DeItem>;
+ fn decode(&self, src: &Message) -> Result<Option<Self::DeItem>>;
}
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs
index 0c7ecd1..5aebdf9 100644
--- a/net/src/endpoint.rs
+++ b/net/src/endpoint.rs
@@ -238,6 +238,12 @@ impl ToEndpoint for String {
}
}
+impl ToEndpoint for Endpoint {
+ fn to_endpoint(&self) -> Result<Endpoint> {
+ Ok(self.clone())
+ }
+}
+
impl ToEndpoint for &str {
fn to_endpoint(&self) -> Result<Endpoint> {
Endpoint::from_str(self)
diff --git a/net/src/stream/websocket.rs b/net/src/stream/websocket.rs
index 9d41626..2626d2f 100644
--- a/net/src/stream/websocket.rs
+++ b/net/src/stream/websocket.rs
@@ -47,7 +47,10 @@ where
pub async fn recv(&mut self) -> Result<C::Item> {
match self.inner.next().await {
- Some(msg) => self.codec.decode(&msg?),
+ Some(msg) => match self.codec.decode(&msg?)? {
+ Some(m) => Ok(m),
+ None => todo!(),
+ },
None => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
}
}
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index b0d0232..83df9c0 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "karyon_p2p"
-description = "A lightweight, extensible, and customizable p2p network stack."
+description = "A lightweight, extensible, and customizable p2p network stack."
version.workspace = true
edition.workspace = true
homepage.workspace = true
@@ -56,3 +56,5 @@ ctrlc = "3.4.4"
easy-parallel = "3.3.1"
env_logger = "0.11.3"
smol = "2.0.0"
+karyon_jsonrpc = { workspace = true, features = ["ws", "smol"] }
+serde_json = "1.0.117"
diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs
index 32c8959..5382781 100644
--- a/p2p/examples/monitor.rs
+++ b/p2p/examples/monitor.rs
@@ -3,14 +3,17 @@ mod shared;
use std::sync::Arc;
use clap::Parser;
+use log::error;
use smol::{channel, Executor};
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
- Backend, Config,
+ ArcBackend, Backend, Config,
};
+use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, SubscriptionID};
+
use shared::run_executor;
#[derive(Parser)]
@@ -20,6 +23,10 @@ struct Cli {
#[arg(short)]
bootstrap_peers: Vec<Endpoint>,
+ /// RPC server endpoint.
+ #[arg(short)]
+ rpc_endpoint: Endpoint,
+
/// Optional list of peer endpoints for manual connections.
#[arg(short)]
peer_endpoints: Vec<Endpoint>,
@@ -33,6 +40,71 @@ struct Cli {
discovery_port: Option<Port>,
}
+struct MonitorRPC {
+ backend: ArcBackend,
+}
+
+#[rpc_impl]
+impl MonitorRPC {
+ async fn peer_id(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.peer_id().to_string()))
+ }
+
+ async fn inbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+
+ async fn outbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+}
+
+#[rpc_pubsub_impl]
+impl MonitorRPC {
+ async fn conn_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+ let conn_events = self.backend.monitor().conn_events().await;
+ smol::spawn(async move {
+ loop {
+ let _event = conn_events.recv().await;
+ if let Err(err) = sub.notify(serde_json::json!("event")).await {
+ error!("Failed to notify: {err}");
+ break;
+ }
+ }
+ })
+ .detach();
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn conn_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> Result<serde_json::Value, karyon_jsonrpc::Error> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
fn main() {
env_logger::init();
let cli = Cli::parse();
@@ -62,44 +134,34 @@ fn main() {
let exc = ex.clone();
run_executor(
async {
- let monitor = backend.monitor();
- let conn_listener = monitor.conn_events().await;
- let peerpool_listener = monitor.peer_pool_events().await;
- let discovery_listener = monitor.discovery_events().await;
-
- let monitor_task = exc.spawn(async move {
- loop {
- let event = conn_listener.recv().await.unwrap();
- println!("New connection event: {}", event);
- }
- });
-
- let monitor_task2 = exc.spawn(async move {
- loop {
- let event = peerpool_listener.recv().await.unwrap();
- println!("New peer pool event: {}", event);
- }
+ // RPC service
+ let service = Arc::new(MonitorRPC {
+ backend: backend.clone(),
});
- let monitor_task3 = exc.spawn(async move {
- loop {
- let event = discovery_listener.recv().await.unwrap();
- println!("New discovery event: {}", event);
- }
- });
+ // Create rpc server
+ let server = Server::builder(cli.rpc_endpoint)
+ .expect("Create server builder")
+ .service(service.clone())
+ .pubsub_service(service)
+ .build_with_executor(exc.clone().into())
+ .await
+ .expect("Build rpc server");
// Run the backend
- backend.run().await.unwrap();
+ backend.run().await.expect("Run p2p backend");
+
+ // Run the RPC server
+ server.start().await;
// Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ ctrlc_r.recv().await.expect("Wait for ctrlc signal");
// Shutdown the backend
backend.shutdown().await;
- monitor_task.cancel().await;
- monitor_task2.cancel().await;
- monitor_task3.cancel().await;
+ // Shutdown the RPC server
+ server.shutdown().await;
},
ex,
);
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index 0db1cee..98297e5 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -25,6 +25,9 @@ pub struct Backend {
/// Identity Key pair
key_pair: KeyPair,
+ /// Peer ID
+ peer_id: PeerID,
+
/// Responsible for network and system monitoring.
monitor: Arc<Monitor>,
@@ -65,6 +68,7 @@ impl Backend {
Arc::new(Self {
key_pair: key_pair.clone(),
+ peer_id,
monitor,
discovery,
config,
@@ -97,8 +101,13 @@ impl Backend {
self.config.clone()
}
+ /// Returns the `PeerID`.
+ pub fn peer_id(&self) -> &PeerID {
+ &self.peer_id
+ }
+
/// Returns the `KeyPair`.
- pub async fn key_pair(&self) -> &KeyPair {
+ pub fn key_pair(&self) -> &KeyPair {
&self.key_pair
}