From 135968d8f1379a6d2f32cbbc3e5b77a5f317a4d6 Mon Sep 17 00:00:00 2001
From: hozan23 <hozan23@karyontech.net>
Date: Mon, 24 Jun 2024 02:18:03 +0200
Subject: p2p/examples: remove redundant code

---
 p2p/examples/monitor/src/client.rs  |  79 ----------
 p2p/examples/monitor/src/main.rs    | 108 -------------
 p2p/examples/monitor/src/service.rs | 300 ------------------------------------
 p2p/examples/monitor/src/shared.rs  |  31 ----
 4 files changed, 518 deletions(-)
 delete mode 100644 p2p/examples/monitor/src/client.rs
 delete mode 100644 p2p/examples/monitor/src/main.rs
 delete mode 100644 p2p/examples/monitor/src/service.rs
 delete mode 100644 p2p/examples/monitor/src/shared.rs

(limited to 'p2p/examples/monitor/src')

diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs
deleted file mode 100644
index 27e7b6f..0000000
--- a/p2p/examples/monitor/src/client.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use clap::Parser;
-use serde::{Deserialize, Serialize};
-
-use karyon_jsonrpc::Client;
-use karyon_p2p::endpoint::Endpoint;
-
-#[derive(Parser)]
-#[command(author, version, about, long_about = None)]
-struct Cli {
-    /// RPC server endpoint.
-    #[arg(short)]
-    rpc_endpoint: Endpoint,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Pong {}
-
-fn main() {
-    smol::block_on(async {
-        env_logger::init();
-        let cli = Cli::parse();
-
-        let rpc = Client::builder(cli.rpc_endpoint)
-            .expect("Create rpc client builder")
-            .build()
-            .await
-            .expect("Create rpc client");
-
-        let sub = rpc
-            .subscribe("MonitorRPC.conn_subscribe", ())
-            .await
-            .expect("Subscribe to connection events");
-
-        let sub2 = rpc
-            .subscribe("MonitorRPC.peer_pool_subscribe", ())
-            .await
-            .expect("Subscribe to peer pool events");
-
-        let sub3 = rpc
-            .subscribe("MonitorRPC.discovery_subscribe", ())
-            .await
-            .expect("Subscribe to discovery events");
-
-        smol::spawn(async move {
-            loop {
-                let event = sub.recv().await.expect("Receive connection event");
-                println!("Receive new connection event: {event}");
-            }
-        })
-        .detach();
-
-        smol::spawn(async move {
-            loop {
-                let event = sub2.recv().await.expect("Receive peer pool event");
-                println!("Receive new peerpool event: {event}");
-            }
-        })
-        .detach();
-
-        smol::spawn(async move {
-            loop {
-                let event = sub3.recv().await.expect("Receive discovery event");
-                println!("Receive new discovery event: {event}");
-            }
-        })
-        .detach();
-
-        // start ping-pong loop
-        loop {
-            smol::Timer::after(std::time::Duration::from_secs(1)).await;
-            let _: Pong = rpc
-                .call("MonitorRPC.ping", ())
-                .await
-                .expect("Receive pong message");
-
-            println!("Receive pong message");
-        }
-    });
-}
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
deleted file mode 100644
index 78ada48..0000000
--- a/p2p/examples/monitor/src/main.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-mod service;
-mod shared;
-
-use std::sync::Arc;
-
-use clap::Parser;
-use smol::{channel, Executor};
-
-use karyon_jsonrpc::Server;
-use karyon_p2p::{
-    endpoint::{Endpoint, Port},
-    keypair::{KeyPair, KeyPairType},
-    Backend, Config,
-};
-
-use service::MonitorRPC;
-use shared::run_executor;
-
-#[derive(Parser)]
-#[command(author, version, about, long_about = None)]
-struct Cli {
-    /// Optional list of bootstrap peers to start the seeding process.
-    #[arg(short)]
-    bootstrap_peers: Vec<Endpoint>,
-
-    /// RPC server endpoint.
-    #[arg(short)]
-    rpc_endpoint: Endpoint,
-
-    /// Optional list of peer endpoints for manual connections.
-    #[arg(short)]
-    peer_endpoints: Vec<Endpoint>,
-
-    /// Optional endpoint for accepting incoming connections.
-    #[arg(short)]
-    listen_endpoint: Option<Endpoint>,
-
-    /// Optional TCP/UDP port for the discovery service.
-    #[arg(short)]
-    discovery_port: Option<Port>,
-}
-
-fn main() {
-    env_logger::init();
-
-    let cli = Cli::parse();
-
-    let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
-
-    // Create the configuration for the backend.
-    let config = Config {
-        listen_endpoint: cli.listen_endpoint,
-        peer_endpoints: cli.peer_endpoints,
-        bootstrap_peers: cli.bootstrap_peers,
-        discovery_port: cli.discovery_port.unwrap_or(0),
-        enable_monitor: true,
-        ..Default::default()
-    };
-
-    // Create a new Executor
-    let ex = Arc::new(Executor::new());
-
-    // Create a new Backend
-    let backend = Backend::new(&key_pair, config, ex.clone().into());
-
-    let (ctrlc_s, ctrlc_r) = channel::unbounded();
-    let handle = move || ctrlc_s.try_send(()).unwrap();
-    ctrlc::set_handler(handle).unwrap();
-
-    let exc = ex.clone();
-    run_executor(
-        async {
-            // RPC service
-            let service = MonitorRPC::new(backend.clone(), exc.clone());
-
-            // Create rpc server
-            let server = Server::builder(cli.rpc_endpoint)
-                .expect("Create server builder")
-                .service(service.clone())
-                .pubsub_service(service.clone())
-                .build_with_executor(exc.clone().into())
-                .await
-                .expect("Build rpc server");
-
-            // Run the RPC server
-            server.start();
-
-            // Run the RPC Service
-            service.run().await.expect("Run monitor rpc service");
-
-            // Run the backend
-            backend.run().await.expect("Run p2p backend");
-
-            // Wait for ctrlc signal
-            ctrlc_r.recv().await.expect("Wait for ctrlc signal");
-
-            // Shutdown the backend
-            backend.shutdown().await;
-
-            // Shutdown the RPC server
-            server.shutdown().await;
-
-            // Shutdown the RPC service
-            service.shutdown().await;
-        },
-        ex,
-    );
-}
diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs
deleted file mode 100644
index bc6ab81..0000000
--- a/p2p/examples/monitor/src/service.rs
+++ /dev/null
@@ -1,300 +0,0 @@
-use std::{collections::HashMap, sync::Arc};
-
-use futures::stream::{FuturesUnordered, StreamExt};
-use log::{debug, error};
-use ringbuffer::{AllocRingBuffer, RingBuffer};
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use smol::{lock::Mutex, Executor};
-
-use karyon_core::async_util::{TaskGroup, TaskResult};
-use karyon_jsonrpc::{
-    message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription,
-};
-use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result};
-
-const EVENT_BUFFER_SIZE: usize = 60;
-
-pub struct MonitorRPC {
-    backend: ArcBackend,
-    subs: Arc<Subscriptions>,
-    task_group: TaskGroup,
-}
-
-impl MonitorRPC {
-    pub fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
-        Arc::new(MonitorRPC {
-            backend,
-            task_group: TaskGroup::with_executor(ex.clone().into()),
-            subs: Subscriptions::new(ex),
-        })
-    }
-
-    pub async fn run(self: &Arc<Self>) -> Result<()> {
-        let conn_events = self.backend.monitor().conn_events().await;
-        let peer_pool_events = self.backend.monitor().peer_pool_events().await;
-        let discovery_events = self.backend.monitor().discovery_events().await;
-
-        let on_failuer = |res: TaskResult<Result<()>>| async move {
-            if let TaskResult::Completed(Err(err)) = res {
-                error!("Event receive loop: {err}")
-            }
-        };
-
-        let selfc = self.clone();
-        self.task_group.spawn(
-            async move {
-                loop {
-                    let event = conn_events.recv().await?;
-                    selfc.subs.notify(MonitorTopic::Conn, event).await;
-                }
-            },
-            on_failuer,
-        );
-
-        let selfc = self.clone();
-        self.task_group.spawn(
-            async move {
-                loop {
-                    let event = peer_pool_events.recv().await?;
-                    selfc.subs.notify(MonitorTopic::PeerPool, event).await;
-                }
-            },
-            on_failuer,
-        );
-
-        let selfc = self.clone();
-        self.task_group.spawn(
-            async move {
-                loop {
-                    let event = discovery_events.recv().await?;
-                    selfc.subs.notify(MonitorTopic::Discovery, event).await;
-                }
-            },
-            on_failuer,
-        );
-
-        Ok(())
-    }
-
-    pub async fn shutdown(&self) {
-        self.task_group.cancel().await;
-        self.subs.stop().await;
-    }
-}
-
-#[rpc_impl]
-impl MonitorRPC {
-    pub async fn ping(&self, _params: Value) -> RPCResult<Value> {
-        Ok(serde_json::json!(Pong {}))
-    }
-
-    pub async fn peer_id(&self, _params: Value) -> RPCResult<Value> {
-        Ok(serde_json::json!(self.backend.peer_id().to_string()))
-    }
-
-    pub async fn inbound_connection(&self, _params: Value) -> RPCResult<Value> {
-        Ok(serde_json::json!(self.backend.inbound_slots()))
-    }
-
-    pub async fn outbound_connection(&self, _params: Value) -> RPCResult<Value> {
-        Ok(serde_json::json!(self.backend.outbound_slots()))
-    }
-}
-
-#[rpc_pubsub_impl]
-impl MonitorRPC {
-    pub async fn conn_subscribe(
-        &self,
-        chan: Arc<Channel>,
-        method: String,
-        _params: Value,
-    ) -> RPCResult<Value> {
-        let sub = chan.new_subscription(&method).await;
-        let sub_id = sub.id;
-
-        self.subs.add(MonitorTopic::Conn, sub).await;
-
-        Ok(serde_json::json!(sub_id))
-    }
-
-    pub async fn peer_pool_subscribe(
-        &self,
-        chan: Arc<Channel>,
-        method: String,
-        _params: Value,
-    ) -> RPCResult<Value> {
-        let sub = chan.new_subscription(&method).await;
-        let sub_id = sub.id;
-
-        self.subs.add(MonitorTopic::PeerPool, sub).await;
-
-        Ok(serde_json::json!(sub_id))
-    }
-
-    pub async fn discovery_subscribe(
-        &self,
-        chan: Arc<Channel>,
-        method: String,
-        _params: Value,
-    ) -> RPCResult<Value> {
-        let sub = chan.new_subscription(&method).await;
-        let sub_id = sub.id;
-
-        self.subs.add(MonitorTopic::Discovery, sub).await;
-
-        Ok(serde_json::json!(sub_id))
-    }
-
-    pub async fn conn_unsubscribe(
-        &self,
-        chan: Arc<Channel>,
-        _method: String,
-        params: Value,
-    ) -> RPCResult<Value> {
-        let sub_id: SubscriptionID = serde_json::from_value(params)?;
-        chan.remove_subscription(&sub_id).await;
-        Ok(serde_json::json!(true))
-    }
-
-    pub async fn peer_pool_unsubscribe(
-        &self,
-        chan: Arc<Channel>,
-        _method: String,
-        params: Value,
-    ) -> RPCResult<Value> {
-        let sub_id: SubscriptionID = serde_json::from_value(params)?;
-        chan.remove_subscription(&sub_id).await;
-        Ok(serde_json::json!(true))
-    }
-
-    pub async fn discovery_unsubscribe(
-        &self,
-        chan: Arc<Channel>,
-        _method: String,
-        params: Value,
-    ) -> RPCResult<Value> {
-        let sub_id: SubscriptionID = serde_json::from_value(params)?;
-        chan.remove_subscription(&sub_id).await;
-        Ok(serde_json::json!(true))
-    }
-}
-
-#[derive(Deserialize, Serialize)]
-struct Pong {}
-
-struct Subscriptions {
-    subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>,
-    buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<Value>>>,
-    task_group: TaskGroup,
-}
-
-impl Subscriptions {
-    fn new(ex: Arc<Executor<'static>>) -> Arc<Self> {
-        let mut subs = HashMap::new();
-        subs.insert(MonitorTopic::Conn, HashMap::new());
-        subs.insert(MonitorTopic::PeerPool, HashMap::new());
-        subs.insert(MonitorTopic::Discovery, HashMap::new());
-
-        let mut buffer = HashMap::new();
-        buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE));
-        buffer.insert(
-            MonitorTopic::PeerPool,
-            AllocRingBuffer::new(EVENT_BUFFER_SIZE),
-        );
-        buffer.insert(
-            MonitorTopic::Discovery,
-            AllocRingBuffer::new(EVENT_BUFFER_SIZE),
-        );
-
-        Arc::new(Self {
-            subs: Mutex::new(subs),
-            buffer: Mutex::new(buffer),
-            task_group: TaskGroup::with_executor(ex.into()),
-        })
-    }
-
-    /// Adds the subscription to the subs map according to the given type
-    async fn add(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
-        match self.subs.lock().await.get_mut(&ty) {
-            Some(subs) => {
-                subs.insert(sub.id, sub.clone());
-            }
-            None => todo!(),
-        }
-        // Send old events in the buffer to the subscriber
-        self.send_old_events(ty, sub).await;
-    }
-
-    /// Notifies all subscribers   
-    async fn notify<T: Serialize>(&self, ty: MonitorTopic, event: T) {
-        let event = serde_json::json!(event);
-        // Add the new event to the ringbuffer
-        match self.buffer.lock().await.get_mut(&ty) {
-            Some(events) => events.push(event.clone()),
-            None => todo!(),
-        }
-
-        // Notify the subscribers
-        match self.subs.lock().await.get_mut(&ty) {
-            Some(subs) => {
-                let mut fulist = FuturesUnordered::new();
-
-                for sub in subs.values() {
-                    let fu = async { (sub.id, sub.notify(event.clone()).await) };
-                    fulist.push(fu)
-                }
-
-                let mut cleanup_list = vec![];
-                while let Some((sub_id, result)) = fulist.next().await {
-                    if let Err(err) = result {
-                        error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty);
-                        cleanup_list.push(sub_id);
-                    }
-                }
-                drop(fulist);
-
-                for sub_id in cleanup_list {
-                    subs.remove(&sub_id);
-                }
-            }
-            None => todo!(),
-        }
-    }
-
-    /// Sends old events in the ringbuffer to the new subscriber.
-    async fn send_old_events(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
-        let ty_cloned = ty.clone();
-        let sub_id = sub.id;
-        let on_complete = move |res: TaskResult<()>| async move {
-            debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id);
-        };
-
-        let selfc = self.clone();
-        self.task_group.spawn(
-            async move {
-                match selfc.buffer.lock().await.get_mut(&ty) {
-                    Some(events) => {
-                        let mut fu = FuturesUnordered::new();
-
-                        for event in events.iter().rev() {
-                            fu.push(sub.notify(event.clone()))
-                        }
-
-                        while let Some(result) = fu.next().await {
-                            if result.is_err() {
-                                return;
-                            }
-                        }
-                    }
-                    None => todo!(),
-                }
-            },
-            on_complete,
-        );
-    }
-
-    async fn stop(&self) {
-        self.task_group.cancel().await;
-    }
-}
diff --git a/p2p/examples/monitor/src/shared.rs b/p2p/examples/monitor/src/shared.rs
deleted file mode 100644
index 0e8079c..0000000
--- a/p2p/examples/monitor/src/shared.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::{num::NonZeroUsize, sync::Arc, thread};
-
-use easy_parallel::Parallel;
-use smol::{channel, future, future::Future, Executor};
-
-/// Returns an estimate of the default amount of parallelism a program should use.
-/// see `std::thread::available_parallelism`
-pub fn available_parallelism() -> usize {
-    thread::available_parallelism()
-        .map(NonZeroUsize::get)
-        .unwrap_or(1)
-}
-
-/// Run a multi-threaded executor
-pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) {
-    let (signal, shutdown) = channel::unbounded::<()>();
-
-    let num_threads = available_parallelism();
-
-    Parallel::new()
-        .each(0..(num_threads), |_| {
-            future::block_on(ex.run(shutdown.recv()))
-        })
-        // Run the main future on the current thread.
-        .finish(|| {
-            future::block_on(async {
-                main_future.await;
-                drop(signal);
-            })
-        });
-}
-- 
cgit v1.2.3