aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples/monitor/src
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-27 00:59:23 +0200
commitd1c816660c0583db33d160e2ef3e980bef0d5a85 (patch)
tree9eb06e6dbfbe34c6c2f85eee8d2e337b155be103 /p2p/examples/monitor/src
parent385d53ec53e750e342cce78edb793958edf5133e (diff)
p2p: WIP rpc server implementation for the p2p monitor
Diffstat (limited to 'p2p/examples/monitor/src')
-rw-r--r--p2p/examples/monitor/src/client.rs48
-rw-r--r--p2p/examples/monitor/src/main.rs335
-rw-r--r--p2p/examples/monitor/src/shared.rs31
3 files changed, 414 insertions, 0 deletions
diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs
new file mode 100644
index 0000000..d4970eb
--- /dev/null
+++ b/p2p/examples/monitor/src/client.rs
@@ -0,0 +1,48 @@
+use clap::Parser;
+
+use karyon_jsonrpc::Client;
+use karyon_p2p::endpoint::Endpoint;
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+ /// RPC server endpoint.
+ #[arg(short)]
+ rpc_endpoint: Endpoint,
+}
+
+fn main() {
+ smol::block_on(async {
+ env_logger::init();
+ let cli = Cli::parse();
+
+ let rpc = Client::builder(cli.rpc_endpoint)
+ .expect("Create rpc client builder")
+ .build()
+ .await
+ .expect("Create rpc client");
+
+ let (_, sub) = rpc
+ .subscribe("MonitorRPC.conn_subscribe", ())
+ .await
+ .expect("Subscribe to connection events");
+
+ let (_, sub2) = rpc
+ .subscribe("MonitorRPC.peer_pool_subscribe", ())
+ .await
+ .expect("Subscribe to peer pool events");
+
+ smol::spawn(async move {
+ loop {
+ let _event = sub.recv().await.expect("Receive connection event");
+ }
+ })
+ .detach();
+
+ smol::spawn(async move {
+ loop {
+ let _event = sub2.recv().await.expect("Receive peer pool event");
+ }
+ }).await;
+ });
+}
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
new file mode 100644
index 0000000..636d652
--- /dev/null
+++ b/p2p/examples/monitor/src/main.rs
@@ -0,0 +1,335 @@
+mod shared;
+
+use std::sync::Arc;
+
+use clap::Parser;
+use log::error;
+use ringbuffer::{AllocRingBuffer, RingBuffer};
+use serde::{Deserialize, Serialize};
+use smol::{channel, lock::Mutex, Executor};
+
+use karyon_core::async_util::{CondWait, TaskGroup, TaskResult};
+use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, Subscription, SubscriptionID};
+use karyon_p2p::{
+ endpoint::{Endpoint, Port},
+ keypair::{KeyPair, KeyPairType},
+ monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent},
+ ArcBackend, Backend, Config, Error, Result,
+};
+
+use shared::run_executor;
+
+const EVENT_BUFFER_SIZE: usize = 30;
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+ /// Optional list of bootstrap peers to start the seeding process.
+ #[arg(short)]
+ bootstrap_peers: Vec<Endpoint>,
+
+ /// RPC server endpoint.
+ #[arg(short)]
+ rpc_endpoint: Endpoint,
+
+ /// Optional list of peer endpoints for manual connections.
+ #[arg(short)]
+ peer_endpoints: Vec<Endpoint>,
+
+ /// Optional endpoint for accepting incoming connections.
+ #[arg(short)]
+ listen_endpoint: Option<Endpoint>,
+
+ /// Optional TCP/UDP port for the discovery service.
+ #[arg(short)]
+ discovery_port: Option<Port>,
+}
+
+struct MonitorRPC {
+ backend: ArcBackend,
+ conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>,
+ pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>,
+ discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>,
+ conn_event_condvar: Arc<CondWait>,
+ pp_event_condvar: Arc<CondWait>,
+ discv_event_condvar: Arc<CondWait>,
+ task_group: TaskGroup,
+}
+
+impl MonitorRPC {
+ fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
+ Arc::new(MonitorRPC {
+ backend,
+ conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
+ pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
+ discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
+ conn_event_condvar: Arc::new(CondWait::new()),
+ pp_event_condvar: Arc::new(CondWait::new()),
+ discv_event_condvar: Arc::new(CondWait::new()),
+ task_group: TaskGroup::with_executor(ex.into()),
+ })
+ }
+
+ async fn run(&self) -> Result<()> {
+ let conn_events = self.backend.monitor().conn_events().await;
+ let peer_pool_events = self.backend.monitor().peer_pool_events().await;
+ let discovery_events = self.backend.monitor().discovery_events().await;
+
+ let conn_event_buffer = self.conn_event_buffer.clone();
+ let pp_event_buffer = self.pp_event_buffer.clone();
+ let discv_event_buffer = self.discv_event_buffer.clone();
+
+ let conn_event_condvar = self.conn_event_condvar.clone();
+ let pp_event_condvar = self.pp_event_condvar.clone();
+ let discv_event_condvar = self.discv_event_condvar.clone();
+
+ let on_failuer = |res: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ error!("Event receive loop: {err}")
+ }
+ };
+
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = conn_events.recv().await?;
+ conn_event_buffer.lock().await.push(event);
+ conn_event_condvar.broadcast().await;
+ }
+ },
+ on_failuer,
+ );
+
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = peer_pool_events.recv().await?;
+ pp_event_buffer.lock().await.push(event);
+ pp_event_condvar.broadcast().await;
+ }
+ },
+ on_failuer,
+ );
+
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = discovery_events.recv().await?;
+ discv_event_buffer.lock().await.push(event);
+ discv_event_condvar.broadcast().await;
+ }
+ },
+ on_failuer,
+ );
+
+ Ok(())
+ }
+
+ async fn shutdown(&self) {
+ self.task_group.cancel().await;
+ }
+}
+
+#[rpc_impl]
+impl MonitorRPC {
+ async fn peer_id(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.peer_id().to_string()))
+ }
+
+ async fn inbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+
+ async fn outbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.outbound_slots()))
+ }
+}
+
+#[rpc_pubsub_impl]
+impl MonitorRPC {
+ async fn conn_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+
+ let cond_wait = self.conn_event_condvar.clone();
+ let buffer = self.conn_event_buffer.clone();
+ self.task_group
+ .spawn(notify(sub, cond_wait, buffer), notify_failed);
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn peer_pool_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+
+ let cond_wait = self.pp_event_condvar.clone();
+ let buffer = self.pp_event_buffer.clone();
+ self.task_group
+ .spawn(notify(sub, cond_wait, buffer), notify_failed);
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn discovery_subscribe(
+ &self,
+ chan: ArcChannel,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id.clone();
+
+ let cond_wait = self.discv_event_condvar.clone();
+ let buffer = self.discv_event_buffer.clone();
+ self.task_group
+ .spawn(notify(sub, cond_wait, buffer), notify_failed);
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ async fn conn_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ async fn peer_pool_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ async fn discovery_unsubscribe(
+ &self,
+ chan: ArcChannel,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
+fn main() {
+ env_logger::init();
+ let cli = Cli::parse();
+
+ let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
+
+ // Create the configuration for the backend.
+ let config = Config {
+ listen_endpoint: cli.listen_endpoint,
+ peer_endpoints: cli.peer_endpoints,
+ bootstrap_peers: cli.bootstrap_peers,
+ discovery_port: cli.discovery_port.unwrap_or(0),
+ enable_monitor: true,
+ ..Default::default()
+ };
+
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
+ // Create a new Backend
+ let backend = Backend::new(&key_pair, config, ex.clone().into());
+
+ let (ctrlc_s, ctrlc_r) = channel::unbounded();
+ let handle = move || ctrlc_s.try_send(()).unwrap();
+ ctrlc::set_handler(handle).unwrap();
+
+ let exc = ex.clone();
+ run_executor(
+ async {
+ // RPC service
+ let service = MonitorRPC::new(backend.clone(), exc.clone());
+
+ // Create rpc server
+ let server = Server::builder(cli.rpc_endpoint)
+ .expect("Create server builder")
+ .service(service.clone())
+ .pubsub_service(service.clone())
+ .build_with_executor(exc.clone().into())
+ .await
+ .expect("Build rpc server");
+
+ // Run the RPC server
+ server.start().await;
+
+ // Run the RPC Service
+ service.run().await.expect("Run monitor rpc service");
+
+ // Run the backend
+ backend.run().await.expect("Run p2p backend");
+
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.expect("Wait for ctrlc signal");
+
+ // Shutdown the backend
+ backend.shutdown().await;
+
+ // Shutdown the RPC server
+ server.shutdown().await;
+
+ // Shutdown the RPC service
+ service.shutdown().await;
+ },
+ ex,
+ );
+}
+
+async fn notify<T: Serialize + Deserialize<'static> + Clone>(
+ sub: Subscription,
+ cond_wait: Arc<CondWait>,
+ buffer: Arc<Mutex<AllocRingBuffer<T>>>,
+) -> Result<()> {
+ for event in buffer.lock().await.iter() {
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ return Err(Error::Other(format!("failed to notify: {err}")));
+ }
+ }
+ loop {
+ cond_wait.wait().await;
+ cond_wait.reset().await;
+ if let Some(event) = buffer.lock().await.back().cloned() {
+ if let Err(err) = sub.notify(serde_json::json!(event)).await {
+ return Err(Error::Other(format!("failed to notify: {err}")));
+ }
+ }
+ }
+}
+
+async fn notify_failed(result: TaskResult<Result<()>>) {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Error: {err}");
+ }
+}
diff --git a/p2p/examples/monitor/src/shared.rs b/p2p/examples/monitor/src/shared.rs
new file mode 100644
index 0000000..0e8079c
--- /dev/null
+++ b/p2p/examples/monitor/src/shared.rs
@@ -0,0 +1,31 @@
+use std::{num::NonZeroUsize, sync::Arc, thread};
+
+use easy_parallel::Parallel;
+use smol::{channel, future, future::Future, Executor};
+
+/// Returns an estimate of the default amount of parallelism a program should use.
+/// see `std::thread::available_parallelism`
+pub fn available_parallelism() -> usize {
+ thread::available_parallelism()
+ .map(NonZeroUsize::get)
+ .unwrap_or(1)
+}
+
+/// Run a multi-threaded executor
+pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) {
+ let (signal, shutdown) = channel::unbounded::<()>();
+
+ let num_threads = available_parallelism();
+
+ Parallel::new()
+ .each(0..(num_threads), |_| {
+ future::block_on(ex.run(shutdown.recv()))
+ })
+ // Run the main future on the current thread.
+ .finish(|| {
+ future::block_on(async {
+ main_future.await;
+ drop(signal);
+ })
+ });
+}