aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/src/endpoint.rs6
-rw-r--r--p2p/examples/monitor/.gitignore2
-rw-r--r--p2p/examples/monitor/Cargo.toml30
-rw-r--r--p2p/examples/monitor/src/client.rs79
-rw-r--r--p2p/examples/monitor/src/main.rs108
-rw-r--r--p2p/examples/monitor/src/service.rs300
-rw-r--r--p2p/examples/monitor/src/shared.rs31
7 files changed, 6 insertions, 550 deletions
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs
index 9fb949b..c3626ec 100644
--- a/net/src/endpoint.rs
+++ b/net/src/endpoint.rs
@@ -35,6 +35,7 @@ pub type Port = u16;
///
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+#[cfg_attr(feature = "serde", serde(into = "String"))]
pub enum Endpoint {
Udp(Addr, Port),
Tcp(Addr, Port),
@@ -68,6 +69,11 @@ impl std::fmt::Display for Endpoint {
}
}
}
+impl From<Endpoint> for String {
+ fn from(endpoint: Endpoint) -> String {
+ endpoint.to_string()
+ }
+}
impl TryFrom<Endpoint> for SocketAddr {
type Error = Error;
diff --git a/p2p/examples/monitor/.gitignore b/p2p/examples/monitor/.gitignore
deleted file mode 100644
index a9d37c5..0000000
--- a/p2p/examples/monitor/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-target
-Cargo.lock
diff --git a/p2p/examples/monitor/Cargo.toml b/p2p/examples/monitor/Cargo.toml
deleted file mode 100644
index 52bab15..0000000
--- a/p2p/examples/monitor/Cargo.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-[package]
-name = "monitor"
-version = "0.1.0"
-edition = "2021"
-
-[workspace]
-
-[dependencies]
-karyon_core = { path = "../../../core", features = ["crypto"] }
-karyon_p2p = { path = "../../", features = ["serde"] }
-karyon_jsonrpc = { path = "../../../jsonrpc", features = ["ws"] }
-clap = { version = "4.5.4", features = ["derive"] }
-ctrlc = "3.4.4"
-env_logger = "0.11.3"
-log = "0.4.21"
-serde = { version = "1.0.203", features = ["derive"] }
-smol = "2.0.0"
-serde_json = "1.0.117"
-easy-parallel = "3.3.1"
-ringbuffer = "0.15.0"
-futures = "0.3.30"
-
-[[bin]]
-name = "client"
-path = "src/client.rs"
-
-[[bin]]
-name = "monitor"
-path = "src/main.rs"
-
diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs
deleted file mode 100644
index 27e7b6f..0000000
--- a/p2p/examples/monitor/src/client.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use clap::Parser;
-use serde::{Deserialize, Serialize};
-
-use karyon_jsonrpc::Client;
-use karyon_p2p::endpoint::Endpoint;
-
-#[derive(Parser)]
-#[command(author, version, about, long_about = None)]
-struct Cli {
- /// RPC server endpoint.
- #[arg(short)]
- rpc_endpoint: Endpoint,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Pong {}
-
-fn main() {
- smol::block_on(async {
- env_logger::init();
- let cli = Cli::parse();
-
- let rpc = Client::builder(cli.rpc_endpoint)
- .expect("Create rpc client builder")
- .build()
- .await
- .expect("Create rpc client");
-
- let sub = rpc
- .subscribe("MonitorRPC.conn_subscribe", ())
- .await
- .expect("Subscribe to connection events");
-
- let sub2 = rpc
- .subscribe("MonitorRPC.peer_pool_subscribe", ())
- .await
- .expect("Subscribe to peer pool events");
-
- let sub3 = rpc
- .subscribe("MonitorRPC.discovery_subscribe", ())
- .await
- .expect("Subscribe to discovery events");
-
- smol::spawn(async move {
- loop {
- let event = sub.recv().await.expect("Receive connection event");
- println!("Receive new connection event: {event}");
- }
- })
- .detach();
-
- smol::spawn(async move {
- loop {
- let event = sub2.recv().await.expect("Receive peer pool event");
- println!("Receive new peerpool event: {event}");
- }
- })
- .detach();
-
- smol::spawn(async move {
- loop {
- let event = sub3.recv().await.expect("Receive discovery event");
- println!("Receive new discovery event: {event}");
- }
- })
- .detach();
-
- // start ping-pong loop
- loop {
- smol::Timer::after(std::time::Duration::from_secs(1)).await;
- let _: Pong = rpc
- .call("MonitorRPC.ping", ())
- .await
- .expect("Receive pong message");
-
- println!("Receive pong message");
- }
- });
-}
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
deleted file mode 100644
index 78ada48..0000000
--- a/p2p/examples/monitor/src/main.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-mod service;
-mod shared;
-
-use std::sync::Arc;
-
-use clap::Parser;
-use smol::{channel, Executor};
-
-use karyon_jsonrpc::Server;
-use karyon_p2p::{
- endpoint::{Endpoint, Port},
- keypair::{KeyPair, KeyPairType},
- Backend, Config,
-};
-
-use service::MonitorRPC;
-use shared::run_executor;
-
-#[derive(Parser)]
-#[command(author, version, about, long_about = None)]
-struct Cli {
- /// Optional list of bootstrap peers to start the seeding process.
- #[arg(short)]
- bootstrap_peers: Vec<Endpoint>,
-
- /// RPC server endpoint.
- #[arg(short)]
- rpc_endpoint: Endpoint,
-
- /// Optional list of peer endpoints for manual connections.
- #[arg(short)]
- peer_endpoints: Vec<Endpoint>,
-
- /// Optional endpoint for accepting incoming connections.
- #[arg(short)]
- listen_endpoint: Option<Endpoint>,
-
- /// Optional TCP/UDP port for the discovery service.
- #[arg(short)]
- discovery_port: Option<Port>,
-}
-
-fn main() {
- env_logger::init();
-
- let cli = Cli::parse();
-
- let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
-
- // Create the configuration for the backend.
- let config = Config {
- listen_endpoint: cli.listen_endpoint,
- peer_endpoints: cli.peer_endpoints,
- bootstrap_peers: cli.bootstrap_peers,
- discovery_port: cli.discovery_port.unwrap_or(0),
- enable_monitor: true,
- ..Default::default()
- };
-
- // Create a new Executor
- let ex = Arc::new(Executor::new());
-
- // Create a new Backend
- let backend = Backend::new(&key_pair, config, ex.clone().into());
-
- let (ctrlc_s, ctrlc_r) = channel::unbounded();
- let handle = move || ctrlc_s.try_send(()).unwrap();
- ctrlc::set_handler(handle).unwrap();
-
- let exc = ex.clone();
- run_executor(
- async {
- // RPC service
- let service = MonitorRPC::new(backend.clone(), exc.clone());
-
- // Create rpc server
- let server = Server::builder(cli.rpc_endpoint)
- .expect("Create server builder")
- .service(service.clone())
- .pubsub_service(service.clone())
- .build_with_executor(exc.clone().into())
- .await
- .expect("Build rpc server");
-
- // Run the RPC server
- server.start();
-
- // Run the RPC Service
- service.run().await.expect("Run monitor rpc service");
-
- // Run the backend
- backend.run().await.expect("Run p2p backend");
-
- // Wait for ctrlc signal
- ctrlc_r.recv().await.expect("Wait for ctrlc signal");
-
- // Shutdown the backend
- backend.shutdown().await;
-
- // Shutdown the RPC server
- server.shutdown().await;
-
- // Shutdown the RPC service
- service.shutdown().await;
- },
- ex,
- );
-}
diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs
deleted file mode 100644
index bc6ab81..0000000
--- a/p2p/examples/monitor/src/service.rs
+++ /dev/null
@@ -1,300 +0,0 @@
-use std::{collections::HashMap, sync::Arc};
-
-use futures::stream::{FuturesUnordered, StreamExt};
-use log::{debug, error};
-use ringbuffer::{AllocRingBuffer, RingBuffer};
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use smol::{lock::Mutex, Executor};
-
-use karyon_core::async_util::{TaskGroup, TaskResult};
-use karyon_jsonrpc::{
- message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription,
-};
-use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result};
-
-const EVENT_BUFFER_SIZE: usize = 60;
-
-pub struct MonitorRPC {
- backend: ArcBackend,
- subs: Arc<Subscriptions>,
- task_group: TaskGroup,
-}
-
-impl MonitorRPC {
- pub fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
- Arc::new(MonitorRPC {
- backend,
- task_group: TaskGroup::with_executor(ex.clone().into()),
- subs: Subscriptions::new(ex),
- })
- }
-
- pub async fn run(self: &Arc<Self>) -> Result<()> {
- let conn_events = self.backend.monitor().conn_events().await;
- let peer_pool_events = self.backend.monitor().peer_pool_events().await;
- let discovery_events = self.backend.monitor().discovery_events().await;
-
- let on_failuer = |res: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Event receive loop: {err}")
- }
- };
-
- let selfc = self.clone();
- self.task_group.spawn(
- async move {
- loop {
- let event = conn_events.recv().await?;
- selfc.subs.notify(MonitorTopic::Conn, event).await;
- }
- },
- on_failuer,
- );
-
- let selfc = self.clone();
- self.task_group.spawn(
- async move {
- loop {
- let event = peer_pool_events.recv().await?;
- selfc.subs.notify(MonitorTopic::PeerPool, event).await;
- }
- },
- on_failuer,
- );
-
- let selfc = self.clone();
- self.task_group.spawn(
- async move {
- loop {
- let event = discovery_events.recv().await?;
- selfc.subs.notify(MonitorTopic::Discovery, event).await;
- }
- },
- on_failuer,
- );
-
- Ok(())
- }
-
- pub async fn shutdown(&self) {
- self.task_group.cancel().await;
- self.subs.stop().await;
- }
-}
-
-#[rpc_impl]
-impl MonitorRPC {
- pub async fn ping(&self, _params: Value) -> RPCResult<Value> {
- Ok(serde_json::json!(Pong {}))
- }
-
- pub async fn peer_id(&self, _params: Value) -> RPCResult<Value> {
- Ok(serde_json::json!(self.backend.peer_id().to_string()))
- }
-
- pub async fn inbound_connection(&self, _params: Value) -> RPCResult<Value> {
- Ok(serde_json::json!(self.backend.inbound_slots()))
- }
-
- pub async fn outbound_connection(&self, _params: Value) -> RPCResult<Value> {
- Ok(serde_json::json!(self.backend.outbound_slots()))
- }
-}
-
-#[rpc_pubsub_impl]
-impl MonitorRPC {
- pub async fn conn_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: Value,
- ) -> RPCResult<Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- self.subs.add(MonitorTopic::Conn, sub).await;
-
- Ok(serde_json::json!(sub_id))
- }
-
- pub async fn peer_pool_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: Value,
- ) -> RPCResult<Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- self.subs.add(MonitorTopic::PeerPool, sub).await;
-
- Ok(serde_json::json!(sub_id))
- }
-
- pub async fn discovery_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: Value,
- ) -> RPCResult<Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- self.subs.add(MonitorTopic::Discovery, sub).await;
-
- Ok(serde_json::json!(sub_id))
- }
-
- pub async fn conn_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: Value,
- ) -> RPCResult<Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- pub async fn peer_pool_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: Value,
- ) -> RPCResult<Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- pub async fn discovery_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: Value,
- ) -> RPCResult<Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-}
-
-#[derive(Deserialize, Serialize)]
-struct Pong {}
-
-struct Subscriptions {
- subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>,
- buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<Value>>>,
- task_group: TaskGroup,
-}
-
-impl Subscriptions {
- fn new(ex: Arc<Executor<'static>>) -> Arc<Self> {
- let mut subs = HashMap::new();
- subs.insert(MonitorTopic::Conn, HashMap::new());
- subs.insert(MonitorTopic::PeerPool, HashMap::new());
- subs.insert(MonitorTopic::Discovery, HashMap::new());
-
- let mut buffer = HashMap::new();
- buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE));
- buffer.insert(
- MonitorTopic::PeerPool,
- AllocRingBuffer::new(EVENT_BUFFER_SIZE),
- );
- buffer.insert(
- MonitorTopic::Discovery,
- AllocRingBuffer::new(EVENT_BUFFER_SIZE),
- );
-
- Arc::new(Self {
- subs: Mutex::new(subs),
- buffer: Mutex::new(buffer),
- task_group: TaskGroup::with_executor(ex.into()),
- })
- }
-
- /// Adds the subscription to the subs map according to the given type
- async fn add(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
- match self.subs.lock().await.get_mut(&ty) {
- Some(subs) => {
- subs.insert(sub.id, sub.clone());
- }
- None => todo!(),
- }
- // Send old events in the buffer to the subscriber
- self.send_old_events(ty, sub).await;
- }
-
- /// Notifies all subscribers
- async fn notify<T: Serialize>(&self, ty: MonitorTopic, event: T) {
- let event = serde_json::json!(event);
- // Add the new event to the ringbuffer
- match self.buffer.lock().await.get_mut(&ty) {
- Some(events) => events.push(event.clone()),
- None => todo!(),
- }
-
- // Notify the subscribers
- match self.subs.lock().await.get_mut(&ty) {
- Some(subs) => {
- let mut fulist = FuturesUnordered::new();
-
- for sub in subs.values() {
- let fu = async { (sub.id, sub.notify(event.clone()).await) };
- fulist.push(fu)
- }
-
- let mut cleanup_list = vec![];
- while let Some((sub_id, result)) = fulist.next().await {
- if let Err(err) = result {
- error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty);
- cleanup_list.push(sub_id);
- }
- }
- drop(fulist);
-
- for sub_id in cleanup_list {
- subs.remove(&sub_id);
- }
- }
- None => todo!(),
- }
- }
-
- /// Sends old events in the ringbuffer to the new subscriber.
- async fn send_old_events(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
- let ty_cloned = ty.clone();
- let sub_id = sub.id;
- let on_complete = move |res: TaskResult<()>| async move {
- debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id);
- };
-
- let selfc = self.clone();
- self.task_group.spawn(
- async move {
- match selfc.buffer.lock().await.get_mut(&ty) {
- Some(events) => {
- let mut fu = FuturesUnordered::new();
-
- for event in events.iter().rev() {
- fu.push(sub.notify(event.clone()))
- }
-
- while let Some(result) = fu.next().await {
- if result.is_err() {
- return;
- }
- }
- }
- None => todo!(),
- }
- },
- on_complete,
- );
- }
-
- async fn stop(&self) {
- self.task_group.cancel().await;
- }
-}
diff --git a/p2p/examples/monitor/src/shared.rs b/p2p/examples/monitor/src/shared.rs
deleted file mode 100644
index 0e8079c..0000000
--- a/p2p/examples/monitor/src/shared.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::{num::NonZeroUsize, sync::Arc, thread};
-
-use easy_parallel::Parallel;
-use smol::{channel, future, future::Future, Executor};
-
-/// Returns an estimate of the default amount of parallelism a program should use.
-/// see `std::thread::available_parallelism`
-pub fn available_parallelism() -> usize {
- thread::available_parallelism()
- .map(NonZeroUsize::get)
- .unwrap_or(1)
-}
-
-/// Run a multi-threaded executor
-pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) {
- let (signal, shutdown) = channel::unbounded::<()>();
-
- let num_threads = available_parallelism();
-
- Parallel::new()
- .each(0..(num_threads), |_| {
- future::block_on(ex.run(shutdown.recv()))
- })
- // Run the main future on the current thread.
- .finish(|| {
- future::block_on(async {
- main_future.await;
- drop(signal);
- })
- });
-}