diff options
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | net/src/codec/websocket.rs | 2 | ||||
-rw-r--r-- | net/src/endpoint.rs | 6 | ||||
-rw-r--r-- | net/src/stream/websocket.rs | 5 | ||||
-rw-r--r-- | p2p/Cargo.toml | 4 | ||||
-rw-r--r-- | p2p/examples/monitor.rs | 120 | ||||
-rw-r--r-- | p2p/src/backend.rs | 11 |
7 files changed, 117 insertions, 33 deletions
@@ -1319,12 +1319,14 @@ dependencies = [ "futures-rustls", "futures-util", "karyon_core", + "karyon_jsonrpc", "karyon_net", "log", "rand", "rcgen 0.12.1", "rustls-pki-types", "semver", + "serde_json", "sha2", "smol", "thiserror", diff --git a/net/src/codec/websocket.rs b/net/src/codec/websocket.rs index b59a55c..8676810 100644 --- a/net/src/codec/websocket.rs +++ b/net/src/codec/websocket.rs @@ -19,5 +19,5 @@ pub trait WebSocketEncoder { pub trait WebSocketDecoder { type DeItem; - fn decode(&self, src: &Message) -> Result<Self::DeItem>; + fn decode(&self, src: &Message) -> Result<Option<Self::DeItem>>; } diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs index 0c7ecd1..5aebdf9 100644 --- a/net/src/endpoint.rs +++ b/net/src/endpoint.rs @@ -238,6 +238,12 @@ impl ToEndpoint for String { } } +impl ToEndpoint for Endpoint { + fn to_endpoint(&self) -> Result<Endpoint> { + Ok(self.clone()) + } +} + impl ToEndpoint for &str { fn to_endpoint(&self) -> Result<Endpoint> { Endpoint::from_str(self) diff --git a/net/src/stream/websocket.rs b/net/src/stream/websocket.rs index 9d41626..2626d2f 100644 --- a/net/src/stream/websocket.rs +++ b/net/src/stream/websocket.rs @@ -47,7 +47,10 @@ where pub async fn recv(&mut self) -> Result<C::Item> { match self.inner.next().await { - Some(msg) => self.codec.decode(&msg?), + Some(msg) => match self.codec.decode(&msg?)? { + Some(m) => Ok(m), + None => todo!(), + }, None => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())), } } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index b0d0232..83df9c0 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "karyon_p2p" -description = "A lightweight, extensible, and customizable p2p network stack." +description = "A lightweight, extensible, and customizable p2p network stack." version.workspace = true edition.workspace = true homepage.workspace = true @@ -56,3 +56,5 @@ ctrlc = "3.4.4" easy-parallel = "3.3.1" env_logger = "0.11.3" smol = "2.0.0" +karyon_jsonrpc = { workspace = true, features = ["ws", "smol"] } +serde_json = "1.0.117" diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index 32c8959..5382781 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -3,14 +3,17 @@ mod shared; use std::sync::Arc; use clap::Parser; +use log::error; use smol::{channel, Executor}; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, - Backend, Config, + ArcBackend, Backend, Config, }; +use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, SubscriptionID}; + use shared::run_executor; #[derive(Parser)] @@ -20,6 +23,10 @@ struct Cli { #[arg(short)] bootstrap_peers: Vec<Endpoint>, + /// RPC server endpoint. + #[arg(short)] + rpc_endpoint: Endpoint, + /// Optional list of peer endpoints for manual connections. #[arg(short)] peer_endpoints: Vec<Endpoint>, @@ -33,6 +40,71 @@ struct Cli { discovery_port: Option<Port>, } +struct MonitorRPC { + backend: ArcBackend, +} + +#[rpc_impl] +impl MonitorRPC { + async fn peer_id( + &self, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + Ok(serde_json::json!(self.backend.peer_id().to_string())) + } + + async fn inbound_connection( + &self, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + Ok(serde_json::json!(self.backend.inbound_slots())) + } + + async fn outbound_connection( + &self, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + Ok(serde_json::json!(self.backend.inbound_slots())) + } +} + +#[rpc_pubsub_impl] +impl MonitorRPC { + async fn conn_subscribe( + &self, + chan: ArcChannel, + method: String, + _params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id.clone(); + let conn_events = self.backend.monitor().conn_events().await; + smol::spawn(async move { + loop { + let _event = conn_events.recv().await; + if let Err(err) = sub.notify(serde_json::json!("event")).await { + error!("Failed to notify: {err}"); + break; + } + } + }) + .detach(); + + Ok(serde_json::json!(sub_id)) + } + + async fn conn_unsubscribe( + &self, + chan: ArcChannel, + _method: String, + params: serde_json::Value, + ) -> Result<serde_json::Value, karyon_jsonrpc::Error> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + fn main() { env_logger::init(); let cli = Cli::parse(); @@ -62,44 +134,34 @@ fn main() { let exc = ex.clone(); run_executor( async { - let monitor = backend.monitor(); - let conn_listener = monitor.conn_events().await; - let peerpool_listener = monitor.peer_pool_events().await; - let discovery_listener = monitor.discovery_events().await; - - let monitor_task = exc.spawn(async move { - loop { - let event = conn_listener.recv().await.unwrap(); - println!("New connection event: {}", event); - } - }); - - let monitor_task2 = exc.spawn(async move { - loop { - let event = peerpool_listener.recv().await.unwrap(); - println!("New peer pool event: {}", event); - } + // RPC service + let service = Arc::new(MonitorRPC { + backend: backend.clone(), }); - let monitor_task3 = exc.spawn(async move { - loop { - let event = discovery_listener.recv().await.unwrap(); - println!("New discovery event: {}", event); - } - }); + // Create rpc server + let server = Server::builder(cli.rpc_endpoint) + .expect("Create server builder") + .service(service.clone()) + .pubsub_service(service) + .build_with_executor(exc.clone().into()) + .await + .expect("Build rpc server"); // Run the backend - backend.run().await.unwrap(); + backend.run().await.expect("Run p2p backend"); + + // Run the RPC server + server.start().await; // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + ctrlc_r.recv().await.expect("Wait for ctrlc signal"); // Shutdown the backend backend.shutdown().await; - monitor_task.cancel().await; - monitor_task2.cancel().await; - monitor_task3.cancel().await; + // Shutdown the RPC server + server.shutdown().await; }, ex, ); diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index 0db1cee..98297e5 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -25,6 +25,9 @@ pub struct Backend { /// Identity Key pair key_pair: KeyPair, + /// Peer ID + peer_id: PeerID, + /// Responsible for network and system monitoring. monitor: Arc<Monitor>, @@ -65,6 +68,7 @@ impl Backend { Arc::new(Self { key_pair: key_pair.clone(), + peer_id, monitor, discovery, config, @@ -97,8 +101,13 @@ impl Backend { self.config.clone() } + /// Returns the `PeerID`. + pub fn peer_id(&self) -> &PeerID { + &self.peer_id + } + /// Returns the `KeyPair`. - pub async fn key_pair(&self) -> &KeyPair { + pub fn key_pair(&self) -> &KeyPair { &self.key_pair } |