diff options
Diffstat (limited to 'p2p/examples/monitor')
-rw-r--r-- | p2p/examples/monitor/Cargo.lock | 43 | ||||
-rw-r--r-- | p2p/examples/monitor/Cargo.toml | 1 | ||||
-rw-r--r-- | p2p/examples/monitor/src/client.rs | 36 | ||||
-rw-r--r-- | p2p/examples/monitor/src/main.rs | 240 | ||||
-rw-r--r-- | p2p/examples/monitor/src/service.rs | 309 |
5 files changed, 391 insertions, 238 deletions
diff --git a/p2p/examples/monitor/Cargo.lock b/p2p/examples/monitor/Cargo.lock index d881b93..25defa2 100644 --- a/p2p/examples/monitor/Cargo.lock +++ b/p2p/examples/monitor/Cargo.lock @@ -931,12 +931,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-channel" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -946,6 +962,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -980,6 +1007,17 @@ dependencies = [ ] [[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] name = "futures-rustls" version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1008,9 +1046,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1386,6 +1428,7 @@ dependencies = [ "ctrlc", "easy-parallel", "env_logger", + "futures", "karyon_core", "karyon_jsonrpc", "karyon_p2p", diff --git a/p2p/examples/monitor/Cargo.toml b/p2p/examples/monitor/Cargo.toml index b323ed1..52bab15 100644 --- a/p2p/examples/monitor/Cargo.toml +++ b/p2p/examples/monitor/Cargo.toml @@ -18,6 +18,7 @@ smol = "2.0.0" serde_json = "1.0.117" easy-parallel = "3.3.1" ringbuffer = "0.15.0" +futures = "0.3.30" [[bin]] name = "client" diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs index b81c286..2f4d00d 100644 --- a/p2p/examples/monitor/src/client.rs +++ b/p2p/examples/monitor/src/client.rs @@ -1,4 +1,5 @@ use clap::Parser; +use serde::{Deserialize, Serialize}; use karyon_jsonrpc::Client; use karyon_p2p::endpoint::Endpoint; @@ -11,6 +12,9 @@ struct Cli { rpc_endpoint: Endpoint, } +#[derive(Deserialize, Serialize)] +struct Pong {} + fn main() { smol::block_on(async { env_logger::init(); @@ -32,18 +36,44 @@ fn main() { .await .expect("Subscribe to peer pool events"); + let (_, sub3) = rpc + .subscribe("MonitorRPC.discovery_subscribe", ()) + .await + .expect("Subscribe to discovery events"); + + smol::spawn(async move { + loop { + let event = sub.recv().await.expect("Receive connection event"); + println!("Receive new connection event: {event}"); + } + }) + .detach(); + smol::spawn(async move { loop { - let _event = sub.recv().await.expect("Receive connection event"); + let event = sub2.recv().await.expect("Receive peer pool event"); + println!("Receive new peerpool event: {event}"); } }) .detach(); smol::spawn(async move { loop { - let _event = sub2.recv().await.expect("Receive peer pool event"); + let event = sub3.recv().await.expect("Receive discovery event"); + println!("Receive new discovery event: {event}"); } }) - .await; + .detach(); + + // start ping-pong loop + loop { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + let _: Pong = rpc + .call("MonitorRPC.ping", ()) + .await + .expect("Receive pong message"); + + println!("Receive pong message"); + } }); } diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs index c57d06c..990f8d2 100644 --- a/p2p/examples/monitor/src/main.rs +++ b/p2p/examples/monitor/src/main.rs @@ -1,28 +1,21 @@ +mod service; mod shared; use std::sync::Arc; use clap::Parser; -use log::error; -use ringbuffer::{AllocRingBuffer, RingBuffer}; -use serde::{Deserialize, Serialize}; -use smol::{channel, lock::Mutex, Executor}; +use smol::{channel, Executor}; -use karyon_core::async_util::{CondWait, TaskGroup, TaskResult}; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Server, Subscription, -}; +use karyon_jsonrpc::Server; use karyon_p2p::{ endpoint::{Endpoint, Port}, keypair::{KeyPair, KeyPairType}, - monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, - ArcBackend, Backend, Config, Error, Result, + Backend, Config, }; +use service::MonitorRPC; use shared::run_executor; -const EVENT_BUFFER_SIZE: usize = 30; - #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -47,202 +40,6 @@ struct Cli { discovery_port: Option<Port>, } -struct MonitorRPC { - backend: ArcBackend, - conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>, - pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>, - discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>, - conn_event_condvar: Arc<CondWait>, - pp_event_condvar: Arc<CondWait>, - discv_event_condvar: Arc<CondWait>, - task_group: TaskGroup, -} - -impl MonitorRPC { - fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> { - Arc::new(MonitorRPC { - backend, - conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), - conn_event_condvar: Arc::new(CondWait::new()), - pp_event_condvar: Arc::new(CondWait::new()), - discv_event_condvar: Arc::new(CondWait::new()), - task_group: TaskGroup::with_executor(ex.into()), - }) - } - - async fn run(&self) -> Result<()> { - let conn_events = self.backend.monitor().conn_events().await; - let peer_pool_events = self.backend.monitor().peer_pool_events().await; - let discovery_events = self.backend.monitor().discovery_events().await; - - let conn_event_buffer = self.conn_event_buffer.clone(); - let pp_event_buffer = self.pp_event_buffer.clone(); - let discv_event_buffer = self.discv_event_buffer.clone(); - - let conn_event_condvar = self.conn_event_condvar.clone(); - let pp_event_condvar = self.pp_event_condvar.clone(); - let discv_event_condvar = self.discv_event_condvar.clone(); - - let on_failuer = |res: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Event receive loop: {err}") - } - }; - - self.task_group.spawn( - async move { - loop { - let event = conn_events.recv().await?; - conn_event_buffer.lock().await.push(event); - conn_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - self.task_group.spawn( - async move { - loop { - let event = peer_pool_events.recv().await?; - pp_event_buffer.lock().await.push(event); - pp_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - self.task_group.spawn( - async move { - loop { - let event = discovery_events.recv().await?; - discv_event_buffer.lock().await.push(event); - discv_event_condvar.broadcast().await; - } - }, - on_failuer, - ); - - Ok(()) - } - - async fn shutdown(&self) { - self.task_group.cancel().await; - } -} - -#[rpc_impl] -impl MonitorRPC { - async fn peer_id( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.peer_id().to_string())) - } - - async fn inbound_connection( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.inbound_slots())) - } - - async fn outbound_connection( - &self, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - Ok(serde_json::json!(self.backend.outbound_slots())) - } -} - -#[rpc_pubsub_impl] -impl MonitorRPC { - async fn conn_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.conn_event_condvar.clone(); - let buffer = self.conn_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn peer_pool_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.pp_event_condvar.clone(); - let buffer = self.pp_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn discovery_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - let cond_wait = self.discv_event_condvar.clone(); - let buffer = self.discv_event_buffer.clone(); - self.task_group - .spawn(notify(sub, cond_wait, buffer), notify_failed); - - Ok(serde_json::json!(sub_id)) - } - - async fn conn_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - async fn peer_pool_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - async fn discovery_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: serde_json::Value, - ) -> karyon_jsonrpc::Result<serde_json::Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } -} - fn main() { env_logger::init(); @@ -309,30 +106,3 @@ fn main() { ex, ); } - -async fn notify<T: Serialize + Deserialize<'static> + Clone>( - sub: Subscription, - cond_wait: Arc<CondWait>, - buffer: Arc<Mutex<AllocRingBuffer<T>>>, -) -> Result<()> { - for event in buffer.lock().await.iter() { - if let Err(err) = sub.notify(serde_json::json!(event)).await { - return Err(Error::Other(format!("failed to notify: {err}"))); - } - } - loop { - cond_wait.wait().await; - cond_wait.reset().await; - if let Some(event) = buffer.lock().await.back().cloned() { - if let Err(err) = sub.notify(serde_json::json!(event)).await { - return Err(Error::Other(format!("failed to notify: {err}"))); - } - } - } -} - -async fn notify_failed(result: TaskResult<Result<()>>) { - if let TaskResult::Completed(Err(err)) = result { - error!("Error: {err}"); - } -} diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs new file mode 100644 index 0000000..15ce8da --- /dev/null +++ b/p2p/examples/monitor/src/service.rs @@ -0,0 +1,309 @@ +use std::{collections::HashMap, sync::Arc}; + +use futures::stream::{FuturesUnordered, StreamExt}; +use log::{debug, error}; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use serde::{Deserialize, Serialize}; +use smol::{lock::Mutex, Executor}; + +use karyon_core::async_util::{TaskGroup, TaskResult}; +use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Subscription}; +use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result}; + +const EVENT_BUFFER_SIZE: usize = 60; + +pub struct MonitorRPC { + backend: ArcBackend, + subs: Arc<Subscriptions>, + task_group: TaskGroup, +} + +impl MonitorRPC { + pub fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> { + Arc::new(MonitorRPC { + backend, + task_group: TaskGroup::with_executor(ex.clone().into()), + subs: Subscriptions::new(ex), + }) + } + + pub async fn run(self: &Arc<Self>) -> Result<()> { + let conn_events = self.backend.monitor().conn_events().await; + let peer_pool_events = self.backend.monitor().peer_pool_events().await; + let discovery_events = self.backend.monitor().discovery_events().await; + + let on_failuer = |res: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = res { + error!("Event receive loop: {err}") + } + }; + + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + let event = conn_events.recv().await?; + selfc.subs.notify(MonitorTopic::Conn, event).await; + } + }, + on_failuer, + ); + + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + let event = peer_pool_events.recv().await?; + selfc.subs.notify(MonitorTopic::PeerPool, event).await; + } + }, + on_failuer, + ); + + let selfc = self.clone(); + self.task_group.spawn( + async move { + loop { + let event = discovery_events.recv().await?; + selfc.subs.notify(MonitorTopic::Discovery, event).await; + } + }, + on_failuer, + ); + + Ok(()) + } + + pub async fn shutdown(&self) { + self.task_group.cancel().await; + self.subs.stop().await; + } +} + +#[rpc_impl] +impl MonitorRPC { + pub async fn ping( + &self, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + Ok(serde_json::json!(Pong {})) + } + + pub async fn peer_id( + &self, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + Ok(serde_json::json!(self.backend.peer_id().to_string())) + } + + pub async fn inbound_connection( + &self, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + Ok(serde_json::json!(self.backend.inbound_slots())) + } + + pub async fn outbound_connection( + &self, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + Ok(serde_json::json!(self.backend.outbound_slots())) + } +} + +#[rpc_pubsub_impl] +impl MonitorRPC { + pub async fn conn_subscribe( + &self, + chan: Arc<Channel>, + method: String, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id; + + self.subs.add(MonitorTopic::Conn, sub).await; + + Ok(serde_json::json!(sub_id)) + } + + pub async fn peer_pool_subscribe( + &self, + chan: Arc<Channel>, + method: String, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id; + + self.subs.add(MonitorTopic::PeerPool, sub).await; + + Ok(serde_json::json!(sub_id)) + } + + pub async fn discovery_subscribe( + &self, + chan: Arc<Channel>, + method: String, + _params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub = chan.new_subscription(&method).await; + let sub_id = sub.id; + + self.subs.add(MonitorTopic::Discovery, sub).await; + + Ok(serde_json::json!(sub_id)) + } + + pub async fn conn_unsubscribe( + &self, + chan: Arc<Channel>, + _method: String, + params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } + + pub async fn peer_pool_unsubscribe( + &self, + chan: Arc<Channel>, + _method: String, + params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } + + pub async fn discovery_unsubscribe( + &self, + chan: Arc<Channel>, + _method: String, + params: serde_json::Value, + ) -> karyon_jsonrpc::Result<serde_json::Value> { + let sub_id: SubscriptionID = serde_json::from_value(params)?; + chan.remove_subscription(&sub_id).await; + Ok(serde_json::json!(true)) + } +} + +#[derive(Deserialize, Serialize)] +struct Pong {} + +struct Subscriptions { + subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>, + buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<serde_json::Value>>>, + task_group: TaskGroup, +} + +impl Subscriptions { + fn new(ex: Arc<Executor<'static>>) -> Arc<Self> { + let mut subs = HashMap::new(); + subs.insert(MonitorTopic::Conn, HashMap::new()); + subs.insert(MonitorTopic::PeerPool, HashMap::new()); + subs.insert(MonitorTopic::Discovery, HashMap::new()); + + let mut buffer = HashMap::new(); + buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE)); + buffer.insert( + MonitorTopic::PeerPool, + AllocRingBuffer::new(EVENT_BUFFER_SIZE), + ); + buffer.insert( + MonitorTopic::Discovery, + AllocRingBuffer::new(EVENT_BUFFER_SIZE), + ); + + Arc::new(Self { + subs: Mutex::new(subs), + buffer: Mutex::new(buffer), + task_group: TaskGroup::with_executor(ex.into()), + }) + } + + /// Adds the subscription to the subs map according to the given type + async fn add(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) { + match self.subs.lock().await.get_mut(&ty) { + Some(subs) => { + subs.insert(sub.id, sub.clone()); + } + None => todo!(), + } + // Send old events in the buffer to the subscriber + self.send_old_events(ty, sub).await; + } + + /// Notifies all subscribers + async fn notify<T: Serialize>(&self, ty: MonitorTopic, event: T) { + let event = serde_json::json!(event); + // Add the new event to the ringbuffer + match self.buffer.lock().await.get_mut(&ty) { + Some(events) => events.push(event.clone()), + None => todo!(), + } + + // Notify the subscribers + match self.subs.lock().await.get_mut(&ty) { + Some(subs) => { + let mut fulist = FuturesUnordered::new(); + + for sub in subs.values() { + let fu = async { (sub.id, sub.notify(event.clone()).await) }; + fulist.push(fu) + } + + let mut cleanup_list = vec![]; + while let Some((sub_id, result)) = fulist.next().await { + if let Err(err) = result { + error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty); + cleanup_list.push(sub_id); + } + } + drop(fulist); + + for sub_id in cleanup_list { + subs.remove(&sub_id); + } + } + None => todo!(), + } + } + + /// Sends old events in the ringbuffer to the new subscriber. + async fn send_old_events(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) { + let ty_cloned = ty.clone(); + let sub_id = sub.id; + let on_complete = move |res: TaskResult<()>| async move { + debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id); + }; + + let selfc = self.clone(); + self.task_group.spawn( + async move { + match selfc.buffer.lock().await.get_mut(&ty) { + Some(events) => { + let mut fu = FuturesUnordered::new(); + + for event in events.iter().rev() { + fu.push(sub.notify(event.clone())) + } + + while let Some(result) = fu.next().await { + if result.is_err() { + return; + } + } + } + None => todo!(), + } + }, + on_complete, + ); + } + + async fn stop(&self) { + self.task_group.cancel().await; + } +} |