diff options
-rw-r--r-- | net/src/endpoint.rs | 6 | ||||
-rw-r--r-- | p2p/examples/monitor/.gitignore | 2 | ||||
-rw-r--r-- | p2p/examples/monitor/Cargo.toml | 30 | ||||
-rw-r--r-- | p2p/examples/monitor/src/client.rs | 79 | ||||
-rw-r--r-- | p2p/examples/monitor/src/main.rs | 108 | ||||
-rw-r--r-- | p2p/examples/monitor/src/service.rs | 300 | ||||
-rw-r--r-- | p2p/examples/monitor/src/shared.rs | 31 |
7 files changed, 6 insertions, 550 deletions
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs index 9fb949b..c3626ec 100644 --- a/net/src/endpoint.rs +++ b/net/src/endpoint.rs @@ -35,6 +35,7 @@ pub type Port = u16; /// #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(into = "String"))] pub enum Endpoint { Udp(Addr, Port), Tcp(Addr, Port), @@ -68,6 +69,11 @@ impl std::fmt::Display for Endpoint { } } } +impl From<Endpoint> for String { + fn from(endpoint: Endpoint) -> String { + endpoint.to_string() + } +} impl TryFrom<Endpoint> for SocketAddr { type Error = Error; diff --git a/p2p/examples/monitor/.gitignore b/p2p/examples/monitor/.gitignore deleted file mode 100644 index a9d37c5..0000000 --- a/p2p/examples/monitor/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -target -Cargo.lock diff --git a/p2p/examples/monitor/Cargo.toml b/p2p/examples/monitor/Cargo.toml deleted file mode 100644 index 52bab15..0000000 --- a/p2p/examples/monitor/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -[package] -name = "monitor" -version = "0.1.0" -edition = "2021" - -[workspace] - -[dependencies] -karyon_core = { path = "../../../core", features = ["crypto"] } -karyon_p2p = { path = "../../", features = ["serde"] } -karyon_jsonrpc = { path = "../../../jsonrpc", features = ["ws"] } -clap = { version = "4.5.4", features = ["derive"] } -ctrlc = "3.4.4" -env_logger = "0.11.3" -log = "0.4.21" -serde = { version = "1.0.203", features = ["derive"] } -smol = "2.0.0" -serde_json = "1.0.117" -easy-parallel = "3.3.1" -ringbuffer = "0.15.0" -futures = "0.3.30" - -[[bin]] -name = "client" -path = "src/client.rs" - -[[bin]] -name = "monitor" -path = "src/main.rs" - diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs deleted file mode 100644 index 27e7b6f..0000000 --- a/p2p/examples/monitor/src/client.rs +++ /dev/null @@ -1,79 +0,0 @@ -use clap::Parser; -use serde::{Deserialize, Serialize}; - -use karyon_jsonrpc::Client; -use karyon_p2p::endpoint::Endpoint; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -struct Cli { - /// RPC server endpoint. - #[arg(short)] - rpc_endpoint: Endpoint, -} - -#[derive(Deserialize, Serialize)] -struct Pong {} - -fn main() { - smol::block_on(async { - env_logger::init(); - let cli = Cli::parse(); - - let rpc = Client::builder(cli.rpc_endpoint) - .expect("Create rpc client builder") - .build() - .await - .expect("Create rpc client"); - - let sub = rpc - .subscribe("MonitorRPC.conn_subscribe", ()) - .await - .expect("Subscribe to connection events"); - - let sub2 = rpc - .subscribe("MonitorRPC.peer_pool_subscribe", ()) - .await - .expect("Subscribe to peer pool events"); - - let sub3 = rpc - .subscribe("MonitorRPC.discovery_subscribe", ()) - .await - .expect("Subscribe to discovery events"); - - smol::spawn(async move { - loop { - let event = sub.recv().await.expect("Receive connection event"); - println!("Receive new connection event: {event}"); - } - }) - .detach(); - - smol::spawn(async move { - loop { - let event = sub2.recv().await.expect("Receive peer pool event"); - println!("Receive new peerpool event: {event}"); - } - }) - .detach(); - - smol::spawn(async move { - loop { - let event = sub3.recv().await.expect("Receive discovery event"); - println!("Receive new discovery event: {event}"); - } - }) - .detach(); - - // start ping-pong loop - loop { - smol::Timer::after(std::time::Duration::from_secs(1)).await; - let _: Pong = rpc - .call("MonitorRPC.ping", ()) - .await - .expect("Receive pong message"); - - println!("Receive pong message"); - } - }); -} diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs deleted file mode 100644 index 78ada48..0000000 --- a/p2p/examples/monitor/src/main.rs +++ /dev/null @@ -1,108 +0,0 @@ -mod service; -mod shared; - -use std::sync::Arc; - -use clap::Parser; -use smol::{channel, Executor}; - -use karyon_jsonrpc::Server; -use karyon_p2p::{ - endpoint::{Endpoint, Port}, - keypair::{KeyPair, KeyPairType}, - Backend, Config, -}; - -use service::MonitorRPC; -use shared::run_executor; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -struct Cli { - /// Optional list of bootstrap peers to start the seeding process. - #[arg(short)] - bootstrap_peers: Vec<Endpoint>, - - /// RPC server endpoint. - #[arg(short)] - rpc_endpoint: Endpoint, - - /// Optional list of peer endpoints for manual connections. - #[arg(short)] - peer_endpoints: Vec<Endpoint>, - - /// Optional endpoint for accepting incoming connections. - #[arg(short)] - listen_endpoint: Option<Endpoint>, - - /// Optional TCP/UDP port for the discovery service. - #[arg(short)] - discovery_port: Option<Port>, -} - -fn main() { - env_logger::init(); - - let cli = Cli::parse(); - - let key_pair = KeyPair::generate(&KeyPairType::Ed25519); - - // Create the configuration for the backend. - let config = Config { - listen_endpoint: cli.listen_endpoint, - peer_endpoints: cli.peer_endpoints, - bootstrap_peers: cli.bootstrap_peers, - discovery_port: cli.discovery_port.unwrap_or(0), - enable_monitor: true, - ..Default::default() - }; - - // Create a new Executor - let ex = Arc::new(Executor::new()); - - // Create a new Backend - let backend = Backend::new(&key_pair, config, ex.clone().into()); - - let (ctrlc_s, ctrlc_r) = channel::unbounded(); - let handle = move || ctrlc_s.try_send(()).unwrap(); - ctrlc::set_handler(handle).unwrap(); - - let exc = ex.clone(); - run_executor( - async { - // RPC service - let service = MonitorRPC::new(backend.clone(), exc.clone()); - - // Create rpc server - let server = Server::builder(cli.rpc_endpoint) - .expect("Create server builder") - .service(service.clone()) - .pubsub_service(service.clone()) - .build_with_executor(exc.clone().into()) - .await - .expect("Build rpc server"); - - // Run the RPC server - server.start(); - - // Run the RPC Service - service.run().await.expect("Run monitor rpc service"); - - // Run the backend - backend.run().await.expect("Run p2p backend"); - - // Wait for ctrlc signal - ctrlc_r.recv().await.expect("Wait for ctrlc signal"); - - // Shutdown the backend - backend.shutdown().await; - - // Shutdown the RPC server - server.shutdown().await; - - // Shutdown the RPC service - service.shutdown().await; - }, - ex, - ); -} diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs deleted file mode 100644 index bc6ab81..0000000 --- a/p2p/examples/monitor/src/service.rs +++ /dev/null @@ -1,300 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use futures::stream::{FuturesUnordered, StreamExt}; -use log::{debug, error}; -use ringbuffer::{AllocRingBuffer, RingBuffer}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use smol::{lock::Mutex, Executor}; - -use karyon_core::async_util::{TaskGroup, TaskResult}; -use karyon_jsonrpc::{ - message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription, -}; -use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result}; - -const EVENT_BUFFER_SIZE: usize = 60; - -pub struct MonitorRPC { - backend: ArcBackend, - subs: Arc<Subscriptions>, - task_group: TaskGroup, -} - -impl MonitorRPC { - pub fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> { - Arc::new(MonitorRPC { - backend, - task_group: TaskGroup::with_executor(ex.clone().into()), - subs: Subscriptions::new(ex), - }) - } - - pub async fn run(self: &Arc<Self>) -> Result<()> { - let conn_events = self.backend.monitor().conn_events().await; - let peer_pool_events = self.backend.monitor().peer_pool_events().await; - let discovery_events = self.backend.monitor().discovery_events().await; - - let on_failuer = |res: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = res { - error!("Event receive loop: {err}") - } - }; - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = conn_events.recv().await?; - selfc.subs.notify(MonitorTopic::Conn, event).await; - } - }, - on_failuer, - ); - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = peer_pool_events.recv().await?; - selfc.subs.notify(MonitorTopic::PeerPool, event).await; - } - }, - on_failuer, - ); - - let selfc = self.clone(); - self.task_group.spawn( - async move { - loop { - let event = discovery_events.recv().await?; - selfc.subs.notify(MonitorTopic::Discovery, event).await; - } - }, - on_failuer, - ); - - Ok(()) - } - - pub async fn shutdown(&self) { - self.task_group.cancel().await; - self.subs.stop().await; - } -} - -#[rpc_impl] -impl MonitorRPC { - pub async fn ping(&self, _params: Value) -> RPCResult<Value> { - Ok(serde_json::json!(Pong {})) - } - - pub async fn peer_id(&self, _params: Value) -> RPCResult<Value> { - Ok(serde_json::json!(self.backend.peer_id().to_string())) - } - - pub async fn inbound_connection(&self, _params: Value) -> RPCResult<Value> { - Ok(serde_json::json!(self.backend.inbound_slots())) - } - - pub async fn outbound_connection(&self, _params: Value) -> RPCResult<Value> { - Ok(serde_json::json!(self.backend.outbound_slots())) - } -} - -#[rpc_pubsub_impl] -impl MonitorRPC { - pub async fn conn_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: Value, - ) -> RPCResult<Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::Conn, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn peer_pool_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: Value, - ) -> RPCResult<Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::PeerPool, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn discovery_subscribe( - &self, - chan: Arc<Channel>, - method: String, - _params: Value, - ) -> RPCResult<Value> { - let sub = chan.new_subscription(&method).await; - let sub_id = sub.id; - - self.subs.add(MonitorTopic::Discovery, sub).await; - - Ok(serde_json::json!(sub_id)) - } - - pub async fn conn_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: Value, - ) -> RPCResult<Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - pub async fn peer_pool_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: Value, - ) -> RPCResult<Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } - - pub async fn discovery_unsubscribe( - &self, - chan: Arc<Channel>, - _method: String, - params: Value, - ) -> RPCResult<Value> { - let sub_id: SubscriptionID = serde_json::from_value(params)?; - chan.remove_subscription(&sub_id).await; - Ok(serde_json::json!(true)) - } -} - -#[derive(Deserialize, Serialize)] -struct Pong {} - -struct Subscriptions { - subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>, - buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<Value>>>, - task_group: TaskGroup, -} - -impl Subscriptions { - fn new(ex: Arc<Executor<'static>>) -> Arc<Self> { - let mut subs = HashMap::new(); - subs.insert(MonitorTopic::Conn, HashMap::new()); - subs.insert(MonitorTopic::PeerPool, HashMap::new()); - subs.insert(MonitorTopic::Discovery, HashMap::new()); - - let mut buffer = HashMap::new(); - buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE)); - buffer.insert( - MonitorTopic::PeerPool, - AllocRingBuffer::new(EVENT_BUFFER_SIZE), - ); - buffer.insert( - MonitorTopic::Discovery, - AllocRingBuffer::new(EVENT_BUFFER_SIZE), - ); - - Arc::new(Self { - subs: Mutex::new(subs), - buffer: Mutex::new(buffer), - task_group: TaskGroup::with_executor(ex.into()), - }) - } - - /// Adds the subscription to the subs map according to the given type - async fn add(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) { - match self.subs.lock().await.get_mut(&ty) { - Some(subs) => { - subs.insert(sub.id, sub.clone()); - } - None => todo!(), - } - // Send old events in the buffer to the subscriber - self.send_old_events(ty, sub).await; - } - - /// Notifies all subscribers - async fn notify<T: Serialize>(&self, ty: MonitorTopic, event: T) { - let event = serde_json::json!(event); - // Add the new event to the ringbuffer - match self.buffer.lock().await.get_mut(&ty) { - Some(events) => events.push(event.clone()), - None => todo!(), - } - - // Notify the subscribers - match self.subs.lock().await.get_mut(&ty) { - Some(subs) => { - let mut fulist = FuturesUnordered::new(); - - for sub in subs.values() { - let fu = async { (sub.id, sub.notify(event.clone()).await) }; - fulist.push(fu) - } - - let mut cleanup_list = vec![]; - while let Some((sub_id, result)) = fulist.next().await { - if let Err(err) = result { - error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty); - cleanup_list.push(sub_id); - } - } - drop(fulist); - - for sub_id in cleanup_list { - subs.remove(&sub_id); - } - } - None => todo!(), - } - } - - /// Sends old events in the ringbuffer to the new subscriber. - async fn send_old_events(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) { - let ty_cloned = ty.clone(); - let sub_id = sub.id; - let on_complete = move |res: TaskResult<()>| async move { - debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id); - }; - - let selfc = self.clone(); - self.task_group.spawn( - async move { - match selfc.buffer.lock().await.get_mut(&ty) { - Some(events) => { - let mut fu = FuturesUnordered::new(); - - for event in events.iter().rev() { - fu.push(sub.notify(event.clone())) - } - - while let Some(result) = fu.next().await { - if result.is_err() { - return; - } - } - } - None => todo!(), - } - }, - on_complete, - ); - } - - async fn stop(&self) { - self.task_group.cancel().await; - } -} diff --git a/p2p/examples/monitor/src/shared.rs b/p2p/examples/monitor/src/shared.rs deleted file mode 100644 index 0e8079c..0000000 --- a/p2p/examples/monitor/src/shared.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::{num::NonZeroUsize, sync::Arc, thread}; - -use easy_parallel::Parallel; -use smol::{channel, future, future::Future, Executor}; - -/// Returns an estimate of the default amount of parallelism a program should use. -/// see `std::thread::available_parallelism` -pub fn available_parallelism() -> usize { - thread::available_parallelism() - .map(NonZeroUsize::get) - .unwrap_or(1) -} - -/// Run a multi-threaded executor -pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) { - let (signal, shutdown) = channel::unbounded::<()>(); - - let num_threads = available_parallelism(); - - Parallel::new() - .each(0..(num_threads), |_| { - future::block_on(ex.run(shutdown.recv())) - }) - // Run the main future on the current thread. - .finish(|| { - future::block_on(async { - main_future.await; - drop(signal); - }) - }); -} |