diff options
author | hozan23 <hozan23@karyontech.net> | 2024-06-14 02:04:43 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-06-14 02:04:43 +0200 |
commit | 0c0699c0460c1b149915729223eec701bde481df (patch) | |
tree | b9ada7e1bd288684eb8fdf690ea1034520ab9f16 /p2p/examples/monitor/src/main.rs | |
parent | 60a947f6e857f0aa5ae5e8c3b0a183577f74a9f4 (diff) |
p2p: WIP implement rpc server for the p2p monitor
Diffstat (limited to 'p2p/examples/monitor/src/main.rs')
-rw-r--r-- | p2p/examples/monitor/src/main.rs | 240 |
1 files changed, 5 insertions, 235 deletions
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs index c57d06c..990f8d2 100644 --- a/p2p/examples/monitor/src/main.rs +++ b/p2p/examples/monitor/src/main.rs @@ -1,28 +1,21 @@ +mod service; mod shared; use std::sync::Arc; use clap::Parser; -use log::error; -use ringbuffer::{AllocRingBuffer, RingBuffer}; -use serde::{Deserialize, Serialize}; -use smol::{channel, lock::Mutex, Executor}; +use smol::{channel, Executor}; -use karyon_core::async_util::{CondWait, TaskGroup, TaskResult}; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Server, Subscription, -}; +use karyon_jsonrpc::Server; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, - monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, - ArcBackend, Backend, Config, Error, Result, + Backend, Config, }; +use service::MonitorRPC; use shared::run_executor; -const EVENT_BUFFER_SIZE: usize = 30; - #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -47,202 +40,6 @@ struct Cli { discovery_port: Option<Port>, } -struct MonitorRPC { - backend: ArcBackend, - conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>, - pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>, - discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>, - conn_event_condvar: Arc<CondWait>, - pp_event_condvar: Arc<CondWait>, - discv_event_condvar: Arc<CondWait>, - task_group: TaskGroup, -} - -impl MonitorRPC { - fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> { - Arc::new(MonitorRPC { - backend, - conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - conn_event_condvar: Arc::new(CondWait::new()), - pp_event_condvar: Arc::new(CondWait::new()), - discv_event_condvar: Arc::new(CondWait::new()), - task_group: TaskGroup::with_executor(ex.into()), - }) - } - - async fn run(&self) -> Result<()> { - let conn_events = self.backend.monitor().conn_events().await; - let peer_pool_events = self.backend.monitor().peer_pool_events().await; - let discovery_events = self.backend.monitor().discovery_events().await; - - let conn_event_buffer = self.conn_event_buffer.clone(); - let pp_event_buffer = self.pp_event_buffer.clone(); - let discv_event_buffer = self.discv_event_buffer.clone(); - - let conn_event_condvar = self.conn_event_condvar.clone(); - let pp_event_condvar = self.pp_event_condvar.clone(); - let discv_event_condvar = self.discv_event_condvar.clone(); - - let on_failuer = |res: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Event receive loop: {err}") - } - }; - - self.task_group.spawn( - async move { - loop { - let event = conn_events.recv().await?; - conn_event_buffer.lock().await.push(event); - conn_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - self.task_group.spawn( - async move { - loop { - let event = peer_pool_events.recv().await?; - pp_event_buffer.lock().await.push(event); - pp_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - self.task_group.spawn( - async move { - loop { - let event = discovery_events.recv().await?; - discv_event_buffer.lock().await.push(event); - discv_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - Ok(()) - } - - async fn shutdown(&self) { - self.task_group.cancel().await; - } -} - -#[rpc_impl] -impl MonitorRPC { - async fn peer_id( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.peer_id().to_string())) - } - - async fn inbound_connection( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.inbound_slots())) - } - - async fn outbound_connection( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.outbound_slots())) - } -} - -#[rpc_pubsub_impl] -impl MonitorRPC { - async fn conn_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.conn_event_condvar.clone(); - let buffer = self.conn_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn peer_pool_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.pp_event_condvar.clone(); - let buffer = self.pp_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn discovery_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.discv_event_condvar.clone(); - let buffer = self.discv_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn conn_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - async fn peer_pool_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - async fn discovery_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } -} - fn main() { env_logger::init(); @@ -309,30 +106,3 @@ fn main() { ex, ); } - -async fn notify<T: Serialize + Deserialize<'static> + Clone>( - sub: Subscription, - cond_wait: Arc<CondWait>, - buffer: Arc<Mutex<AllocRingBuffer<T>>>, -) -> Result<()> { - for event in buffer.lock().await.iter() { - if let Err(err) = sub.notify(serde_json::json!(event)).await { - return Err(Error::Other(format!("failed to notify: {err}"))); - } - } - loop { - cond_wait.wait().await; - cond_wait.reset().await; - if let Some(event) = buffer.lock().await.back().cloned() { - if let Err(err) = sub.notify(serde_json::json!(event)).await { - return Err(Error::Other(format!("failed to notify: {err}"))); - } - } - } -} - -async fn notify_failed(result: TaskResult<Result<()>>) { - if let TaskResult::Completed(Err(err)) = result { - error!("Error: {err}"); - } -} |