diff options
Diffstat (limited to 'p2p/examples/monitor/src')
| -rw-r--r-- | p2p/examples/monitor/src/client.rs | 48 | ||||
| -rw-r--r-- | p2p/examples/monitor/src/main.rs | 335 | ||||
| -rw-r--r-- | p2p/examples/monitor/src/shared.rs | 31 | 
3 files changed, 414 insertions, 0 deletions
| diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs new file mode 100644 index 0000000..d4970eb --- /dev/null +++ b/p2p/examples/monitor/src/client.rs @@ -0,0 +1,48 @@ +use clap::Parser; + +use karyon_jsonrpc::Client; +use karyon_p2p::endpoint::Endpoint; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { +    /// RPC server endpoint. +    #[arg(short)] +    rpc_endpoint: Endpoint, +} + +fn main() { +    smol::block_on(async { +        env_logger::init(); +        let cli = Cli::parse(); + +        let rpc = Client::builder(cli.rpc_endpoint) +            .expect("Create rpc client builder") +            .build() +            .await +            .expect("Create rpc client"); + +        let (_, sub) = rpc +            .subscribe("MonitorRPC.conn_subscribe", ()) +            .await +            .expect("Subscribe to connection events"); + +        let (_, sub2) = rpc +            .subscribe("MonitorRPC.peer_pool_subscribe", ()) +            .await +            .expect("Subscribe to peer pool events"); + +        smol::spawn(async move { +            loop { +                let _event = sub.recv().await.expect("Receive connection event"); +            } +        }) +        .detach(); + +        smol::spawn(async move { +            loop { +                let _event = sub2.recv().await.expect("Receive peer pool event"); +            } +        }).await; +    }); +} diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs new file mode 100644 index 0000000..636d652 --- /dev/null +++ b/p2p/examples/monitor/src/main.rs @@ -0,0 +1,335 @@ +mod shared; + +use std::sync::Arc; + +use clap::Parser; +use log::error; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use serde::{Deserialize, Serialize}; +use smol::{channel, lock::Mutex, Executor}; + +use karyon_core::async_util::{CondWait, TaskGroup, TaskResult}; +use karyon_jsonrpc::{rpc_impl, rpc_pubsub_impl, ArcChannel, Server, Subscription, SubscriptionID}; +use karyon_p2p::{ +    endpoint::{Endpoint, Port}, +    keypair::{KeyPair, KeyPairType}, +    monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent}, +    ArcBackend, Backend, Config, Error, Result, +}; + +use shared::run_executor; + +const EVENT_BUFFER_SIZE: usize = 30; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { +    /// Optional list of bootstrap peers to start the seeding process. +    #[arg(short)] +    bootstrap_peers: Vec<Endpoint>, + +    /// RPC server endpoint. +    #[arg(short)] +    rpc_endpoint: Endpoint, + +    /// Optional list of peer endpoints for manual connections. +    #[arg(short)] +    peer_endpoints: Vec<Endpoint>, + +    /// Optional endpoint for accepting incoming connections. +    #[arg(short)] +    listen_endpoint: Option<Endpoint>, + +    /// Optional TCP/UDP port for the discovery service. +    #[arg(short)] +    discovery_port: Option<Port>, +} + +struct MonitorRPC { +    backend: ArcBackend, +    conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>, +    pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>, +    discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>, +    conn_event_condvar: Arc<CondWait>, +    pp_event_condvar: Arc<CondWait>, +    discv_event_condvar: Arc<CondWait>, +    task_group: TaskGroup, +} + +impl MonitorRPC { +    fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> { +        Arc::new(MonitorRPC { +            backend, +            conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), +            pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), +            discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))), +            conn_event_condvar: Arc::new(CondWait::new()), +            pp_event_condvar: Arc::new(CondWait::new()), +            discv_event_condvar: Arc::new(CondWait::new()), +            task_group: TaskGroup::with_executor(ex.into()), +        }) +    } + +    async fn run(&self) -> Result<()> { +        let conn_events = self.backend.monitor().conn_events().await; +        let peer_pool_events = self.backend.monitor().peer_pool_events().await; +        let discovery_events = self.backend.monitor().discovery_events().await; + +        let conn_event_buffer = self.conn_event_buffer.clone(); +        let pp_event_buffer = self.pp_event_buffer.clone(); +        let discv_event_buffer = self.discv_event_buffer.clone(); + +        let conn_event_condvar = self.conn_event_condvar.clone(); +        let pp_event_condvar = self.pp_event_condvar.clone(); +        let discv_event_condvar = self.discv_event_condvar.clone(); + +        let on_failuer = |res: TaskResult<Result<()>>| async move { +            if let TaskResult::Completed(Err(err)) = res { +                error!("Event receive loop: {err}") +            } +        }; + +        self.task_group.spawn( +            async move { +                loop { +                    let event = conn_events.recv().await?; +                    conn_event_buffer.lock().await.push(event); +                    conn_event_condvar.broadcast().await; +                } +            }, +            on_failuer, +        ); + +        self.task_group.spawn( +            async move { +                loop { +                    let event = peer_pool_events.recv().await?; +                    pp_event_buffer.lock().await.push(event); +                    pp_event_condvar.broadcast().await; +                } +            }, +            on_failuer, +        ); + +        self.task_group.spawn( +            async move { +                loop { +                    let event = discovery_events.recv().await?; +                    discv_event_buffer.lock().await.push(event); +                    discv_event_condvar.broadcast().await; +                } +            }, +            on_failuer, +        ); + +        Ok(()) +    } + +    async fn shutdown(&self) { +        self.task_group.cancel().await; +    } +} + +#[rpc_impl] +impl MonitorRPC { +    async fn peer_id( +        &self, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        Ok(serde_json::json!(self.backend.peer_id().to_string())) +    } + +    async fn inbound_connection( +        &self, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        Ok(serde_json::json!(self.backend.inbound_slots())) +    } + +    async fn outbound_connection( +        &self, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        Ok(serde_json::json!(self.backend.outbound_slots())) +    } +} + +#[rpc_pubsub_impl] +impl MonitorRPC { +    async fn conn_subscribe( +        &self, +        chan: ArcChannel, +        method: String, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub = chan.new_subscription(&method).await; +        let sub_id = sub.id.clone(); + +        let cond_wait = self.conn_event_condvar.clone(); +        let buffer = self.conn_event_buffer.clone(); +        self.task_group +            .spawn(notify(sub, cond_wait, buffer), notify_failed); + +        Ok(serde_json::json!(sub_id)) +    } + +    async fn peer_pool_subscribe( +        &self, +        chan: ArcChannel, +        method: String, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub = chan.new_subscription(&method).await; +        let sub_id = sub.id.clone(); + +        let cond_wait = self.pp_event_condvar.clone(); +        let buffer = self.pp_event_buffer.clone(); +        self.task_group +            .spawn(notify(sub, cond_wait, buffer), notify_failed); + +        Ok(serde_json::json!(sub_id)) +    } + +    async fn discovery_subscribe( +        &self, +        chan: ArcChannel, +        method: String, +        _params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub = chan.new_subscription(&method).await; +        let sub_id = sub.id.clone(); + +        let cond_wait = self.discv_event_condvar.clone(); +        let buffer = self.discv_event_buffer.clone(); +        self.task_group +            .spawn(notify(sub, cond_wait, buffer), notify_failed); + +        Ok(serde_json::json!(sub_id)) +    } + +    async fn conn_unsubscribe( +        &self, +        chan: ArcChannel, +        _method: String, +        params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub_id: SubscriptionID = serde_json::from_value(params)?; +        chan.remove_subscription(&sub_id).await; +        Ok(serde_json::json!(true)) +    } + +    async fn peer_pool_unsubscribe( +        &self, +        chan: ArcChannel, +        _method: String, +        params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub_id: SubscriptionID = serde_json::from_value(params)?; +        chan.remove_subscription(&sub_id).await; +        Ok(serde_json::json!(true)) +    } + +    async fn discovery_unsubscribe( +        &self, +        chan: ArcChannel, +        _method: String, +        params: serde_json::Value, +    ) -> karyon_jsonrpc::Result<serde_json::Value> { +        let sub_id: SubscriptionID = serde_json::from_value(params)?; +        chan.remove_subscription(&sub_id).await; +        Ok(serde_json::json!(true)) +    } +} + +fn main() { +    env_logger::init(); +    let cli = Cli::parse(); + +    let key_pair = KeyPair::generate(&KeyPairType::Ed25519); + +    // Create the configuration for the backend. +    let config = Config { +        listen_endpoint: cli.listen_endpoint, +        peer_endpoints: cli.peer_endpoints, +        bootstrap_peers: cli.bootstrap_peers, +        discovery_port: cli.discovery_port.unwrap_or(0), +        enable_monitor: true, +        ..Default::default() +    }; + +    // Create a new Executor +    let ex = Arc::new(Executor::new()); + +    // Create a new Backend +    let backend = Backend::new(&key_pair, config, ex.clone().into()); + +    let (ctrlc_s, ctrlc_r) = channel::unbounded(); +    let handle = move || ctrlc_s.try_send(()).unwrap(); +    ctrlc::set_handler(handle).unwrap(); + +    let exc = ex.clone(); +    run_executor( +        async { +            // RPC service +            let service = MonitorRPC::new(backend.clone(), exc.clone()); + +            // Create rpc server +            let server = Server::builder(cli.rpc_endpoint) +                .expect("Create server builder") +                .service(service.clone()) +                .pubsub_service(service.clone()) +                .build_with_executor(exc.clone().into()) +                .await +                .expect("Build rpc server"); + +            // Run the RPC server +            server.start().await; + +            // Run the RPC Service +            service.run().await.expect("Run monitor rpc service"); + +            // Run the backend +            backend.run().await.expect("Run p2p backend"); + +            // Wait for ctrlc signal +            ctrlc_r.recv().await.expect("Wait for ctrlc signal"); + +            // Shutdown the backend +            backend.shutdown().await; + +            // Shutdown the RPC server +            server.shutdown().await; + +            // Shutdown the RPC service +            service.shutdown().await; +        }, +        ex, +    ); +} + +async fn notify<T: Serialize + Deserialize<'static> + Clone>( +    sub: Subscription, +    cond_wait: Arc<CondWait>, +    buffer: Arc<Mutex<AllocRingBuffer<T>>>, +) -> Result<()> { +    for event in buffer.lock().await.iter() { +        if let Err(err) = sub.notify(serde_json::json!(event)).await { +            return Err(Error::Other(format!("failed to notify: {err}"))); +        } +    } +    loop { +        cond_wait.wait().await; +        cond_wait.reset().await; +        if let Some(event) = buffer.lock().await.back().cloned() { +            if let Err(err) = sub.notify(serde_json::json!(event)).await { +                return Err(Error::Other(format!("failed to notify: {err}"))); +            } +        } +    } +} + +async fn notify_failed(result: TaskResult<Result<()>>) { +    if let TaskResult::Completed(Err(err)) = result { +        error!("Error: {err}"); +    } +} diff --git a/p2p/examples/monitor/src/shared.rs b/p2p/examples/monitor/src/shared.rs new file mode 100644 index 0000000..0e8079c --- /dev/null +++ b/p2p/examples/monitor/src/shared.rs @@ -0,0 +1,31 @@ +use std::{num::NonZeroUsize, sync::Arc, thread}; + +use easy_parallel::Parallel; +use smol::{channel, future, future::Future, Executor}; + +/// Returns an estimate of the default amount of parallelism a program should use. +/// see `std::thread::available_parallelism` +pub fn available_parallelism() -> usize { +    thread::available_parallelism() +        .map(NonZeroUsize::get) +        .unwrap_or(1) +} + +/// Run a multi-threaded executor +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) { +    let (signal, shutdown) = channel::unbounded::<()>(); + +    let num_threads = available_parallelism(); + +    Parallel::new() +        .each(0..(num_threads), |_| { +            future::block_on(ex.run(shutdown.recv())) +        }) +        // Run the main future on the current thread. +        .finish(|| { +            future::block_on(async { +                main_future.await; +                drop(signal); +            }) +        }); +} | 
