From 135968d8f1379a6d2f32cbbc3e5b77a5f317a4d6 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 24 Jun 2024 02:18:03 +0200 Subject: p2p/examples: remove redundant code --- p2p/examples/monitor/src/service.rs | 300 ------------------------------------ 1 file changed, 300 deletions(-) delete mode 100644 p2p/examples/monitor/src/service.rs (limited to 'p2p/examples/monitor/src/service.rs') diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs deleted file mode 100644 index bc6ab81..0000000 --- a/p2p/examples/monitor/src/service.rs +++ /dev/null @@ -1,300 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use futures::stream::{FuturesUnordered, StreamExt}; -use log::{debug, error}; -use ringbuffer::{AllocRingBuffer, RingBuffer}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use smol::{lock::Mutex, Executor}; - -use karyon_core::async_util::{TaskGroup, TaskResult}; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription, -}; -use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result}; - -const EVENT_BUFFER_SIZE: usize = 60; - -pub struct MonitorRPC { - backend: ArcBackend, - subs: Arc, - task_group: TaskGroup, -} - -impl MonitorRPC { - pub fn new(backend: ArcBackend, ex: Arc>) -> Arc { - Arc::new(MonitorRPC { - backend, - task_group: TaskGroup::with_executor(ex.clone().into()), - subs: Subscriptions::new(ex), - }) - } - - pub async fn run(self: &Arc) -> Result<()> { - let conn_events = self.backend.monitor().conn_events().await; - let peer_pool_events = self.backend.monitor().peer_pool_events().await; - let discovery_events = self.backend.monitor().discovery_events().await; - - let on_failuer = |res: TaskResult>| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Event receive loop: {err}") - } - }; - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = conn_events.recv().await?; - selfc.subs.notify(MonitorTopic::Conn, event).await; - } - }, - on_failuer, - ); - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = peer_pool_events.recv().await?; - selfc.subs.notify(MonitorTopic::PeerPool, event).await; - } - }, - on_failuer, - ); - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = discovery_events.recv().await?; - selfc.subs.notify(MonitorTopic::Discovery, event).await; - } - }, - on_failuer, - ); - - Ok(()) - } - - pub async fn shutdown(&self) { - self.task_group.cancel().await; - self.subs.stop().await; - } -} - -#[rpc_impl] -impl MonitorRPC { - pub async fn ping(&self, _params: Value) -> RPCResult { - Ok(serde_json::json!(Pong {})) - } - - pub async fn peer_id(&self, _params: Value) -> RPCResult { - Ok(serde_json::json!(self.backend.peer_id().to_string())) - } - - pub async fn inbound_connection(&self, _params: Value) -> RPCResult { - Ok(serde_json::json!(self.backend.inbound_slots())) - } - - pub async fn outbound_connection(&self, _params: Value) -> RPCResult { - Ok(serde_json::json!(self.backend.outbound_slots())) - } -} - -#[rpc_pubsub_impl] -impl MonitorRPC { - pub async fn conn_subscribe( - &self, - chan: Arc, - method: String, - _params: Value, - ) -> RPCResult { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::Conn, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn peer_pool_subscribe( - &self, - chan: Arc, - method: String, - _params: Value, - ) -> RPCResult { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::PeerPool, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn discovery_subscribe( - &self, - chan: Arc, - method: String, - _params: Value, - ) -> RPCResult { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::Discovery, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn conn_unsubscribe( - &self, - chan: Arc, - _method: String, - params: Value, - ) -> RPCResult { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - pub async fn peer_pool_unsubscribe( - &self, - chan: Arc, - _method: String, - params: Value, - ) -> RPCResult { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - pub async fn discovery_unsubscribe( - &self, - chan: Arc, - _method: String, - params: Value, - ) -> RPCResult { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } -} - -#[derive(Deserialize, Serialize)] -struct Pong {} - -struct Subscriptions { - subs: Mutex>>, - buffer: Mutex>>, - task_group: TaskGroup, -} - -impl Subscriptions { - fn new(ex: Arc>) -> Arc { - let mut subs = HashMap::new(); - subs.insert(MonitorTopic::Conn, HashMap::new()); - subs.insert(MonitorTopic::PeerPool, HashMap::new()); - subs.insert(MonitorTopic::Discovery, HashMap::new()); - - let mut buffer = HashMap::new(); - buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE)); - buffer.insert( - MonitorTopic::PeerPool, - AllocRingBuffer::new(EVENT_BUFFER_SIZE), - ); - buffer.insert( - MonitorTopic::Discovery, - AllocRingBuffer::new(EVENT_BUFFER_SIZE), - ); - - Arc::new(Self { - subs: Mutex::new(subs), - buffer: Mutex::new(buffer), - task_group: TaskGroup::with_executor(ex.into()), - }) - } - - /// Adds the subscription to the subs map according to the given type - async fn add(self: &Arc, ty: MonitorTopic, sub: Subscription) { - match self.subs.lock().await.get_mut(&ty) { - Some(subs) => { - subs.insert(sub.id, sub.clone()); - } - None => todo!(), - } - // Send old events in the buffer to the subscriber - self.send_old_events(ty, sub).await; - } - - /// Notifies all subscribers - async fn notify(&self, ty: MonitorTopic, event: T) { - let event = serde_json::json!(event); - // Add the new event to the ringbuffer - match self.buffer.lock().await.get_mut(&ty) { - Some(events) => events.push(event.clone()), - None => todo!(), - } - - // Notify the subscribers - match self.subs.lock().await.get_mut(&ty) { - Some(subs) => { - let mut fulist = FuturesUnordered::new(); - - for sub in subs.values() { - let fu = async { (sub.id, sub.notify(event.clone()).await) }; - fulist.push(fu) - } - - let mut cleanup_list = vec![]; - while let Some((sub_id, result)) = fulist.next().await { - if let Err(err) = result { - error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty); - cleanup_list.push(sub_id); - } - } - drop(fulist); - - for sub_id in cleanup_list { - subs.remove(&sub_id); - } - } - None => todo!(), - } - } - - /// Sends old events in the ringbuffer to the new subscriber. - async fn send_old_events(self: &Arc, ty: MonitorTopic, sub: Subscription) { - let ty_cloned = ty.clone(); - let sub_id = sub.id; - let on_complete = move |res: TaskResult<()>| async move { - debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id); - }; - - let selfc = self.clone(); - self.task_group.spawn( - async move { - match selfc.buffer.lock().await.get_mut(&ty) { - Some(events) => { - let mut fu = FuturesUnordered::new(); - - for event in events.iter().rev() { - fu.push(sub.notify(event.clone())) - } - - while let Some(result) = fu.next().await { - if result.is_err() { - return; - } - } - } - None => todo!(), - } - }, - on_complete, - ); - } - - async fn stop(&self) { - self.task_group.cancel().await; - } -} -- cgit v1.2.3