aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples/monitor/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/examples/monitor/src/main.rs')
-rw-r--r--p2p/examples/monitor/src/main.rs240
1 files changed, 5 insertions, 235 deletions
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
index c57d06c..990f8d2 100644
--- a/p2p/examples/monitor/src/main.rs
+++ b/p2p/examples/monitor/src/main.rs
@@ -1,28 +1,21 @@
+mod service;
mod shared;
use std::sync::Arc;
use clap::Parser;
-use log::error;
-use ringbuffer::{AllocRingBuffer, RingBuffer};
-use serde::{Deserialize, Serialize};
-use smol::{channel, lock::Mutex, Executor};
+use smol::{channel, Executor};
-use karyon_core::async_util::{CondWait, TaskGroup, TaskResult};
-use karyon_jsonrpc::{
- message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Server, Subscription,
-};
+use karyon_jsonrpc::Server;
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
- monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent},
- ArcBackend, Backend, Config, Error, Result,
+ Backend, Config,
};
+use service::MonitorRPC;
use shared::run_executor;
-const EVENT_BUFFER_SIZE: usize = 30;
-
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -47,202 +40,6 @@ struct Cli {
discovery_port: Option<Port>,
}
-struct MonitorRPC {
- backend: ArcBackend,
- conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>,
- pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>,
- discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>,
- conn_event_condvar: Arc<CondWait>,
- pp_event_condvar: Arc<CondWait>,
- discv_event_condvar: Arc<CondWait>,
- task_group: TaskGroup,
-}
-
-impl MonitorRPC {
- fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
- Arc::new(MonitorRPC {
- backend,
- conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- conn_event_condvar: Arc::new(CondWait::new()),
- pp_event_condvar: Arc::new(CondWait::new()),
- discv_event_condvar: Arc::new(CondWait::new()),
- task_group: TaskGroup::with_executor(ex.into()),
- })
- }
-
- async fn run(&self) -> Result<()> {
- let conn_events = self.backend.monitor().conn_events().await;
- let peer_pool_events = self.backend.monitor().peer_pool_events().await;
- let discovery_events = self.backend.monitor().discovery_events().await;
-
- let conn_event_buffer = self.conn_event_buffer.clone();
- let pp_event_buffer = self.pp_event_buffer.clone();
- let discv_event_buffer = self.discv_event_buffer.clone();
-
- let conn_event_condvar = self.conn_event_condvar.clone();
- let pp_event_condvar = self.pp_event_condvar.clone();
- let discv_event_condvar = self.discv_event_condvar.clone();
-
- let on_failuer = |res: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Event receive loop: {err}")
- }
- };
-
- self.task_group.spawn(
- async move {
- loop {
- let event = conn_events.recv().await?;
- conn_event_buffer.lock().await.push(event);
- conn_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- self.task_group.spawn(
- async move {
- loop {
- let event = peer_pool_events.recv().await?;
- pp_event_buffer.lock().await.push(event);
- pp_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- self.task_group.spawn(
- async move {
- loop {
- let event = discovery_events.recv().await?;
- discv_event_buffer.lock().await.push(event);
- discv_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- Ok(())
- }
-
- async fn shutdown(&self) {
- self.task_group.cancel().await;
- }
-}
-
-#[rpc_impl]
-impl MonitorRPC {
- async fn peer_id(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.peer_id().to_string()))
- }
-
- async fn inbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.inbound_slots()))
- }
-
- async fn outbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.outbound_slots()))
- }
-}
-
-#[rpc_pubsub_impl]
-impl MonitorRPC {
- async fn conn_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.conn_event_condvar.clone();
- let buffer = self.conn_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn peer_pool_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.pp_event_condvar.clone();
- let buffer = self.pp_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn discovery_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.discv_event_condvar.clone();
- let buffer = self.discv_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn conn_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- async fn peer_pool_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- async fn discovery_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-}
-
fn main() {
env_logger::init();
@@ -309,30 +106,3 @@ fn main() {
ex,
);
}
-
-async fn notify<T: Serialize + Deserialize<'static> + Clone>(
- sub: Subscription,
- cond_wait: Arc<CondWait>,
- buffer: Arc<Mutex<AllocRingBuffer<T>>>,
-) -> Result<()> {
- for event in buffer.lock().await.iter() {
- if let Err(err) = sub.notify(serde_json::json!(event)).await {
- return Err(Error::Other(format!("failed to notify: {err}")));
- }
- }
- loop {
- cond_wait.wait().await;
- cond_wait.reset().await;
- if let Some(event) = buffer.lock().await.back().cloned() {
- if let Err(err) = sub.notify(serde_json::json!(event)).await {
- return Err(Error::Other(format!("failed to notify: {err}")));
- }
- }
- }
-}
-
-async fn notify_failed(result: TaskResult<Result<()>>) {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Error: {err}");
- }
-}