From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- p2p/src/backend.rs | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 p2p/src/backend.rs (limited to 'p2p/src/backend.rs') diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs new file mode 100644 index 0000000..290e3e7 --- /dev/null +++ b/p2p/src/backend.rs @@ -0,0 +1,139 @@ +use std::sync::Arc; + +use log::info; + +use karyons_core::{pubsub::Subscription, Executor}; + +use crate::{ + config::Config, + discovery::{ArcDiscovery, Discovery}, + monitor::{Monitor, MonitorEvent}, + net::ConnQueue, + peer_pool::PeerPool, + protocol::{ArcProtocol, Protocol}, + ArcPeer, PeerID, Result, +}; + +pub type ArcBackend = Arc; + +/// Backend serves as the central entry point for initiating and managing +/// the P2P network. +/// +/// +/// # Example +/// ``` +/// use std::sync::Arc; +/// +/// use easy_parallel::Parallel; +/// use smol::{channel as smol_channel, future, Executor}; +/// +/// use karyons_p2p::{Backend, Config, PeerID}; +/// +/// let peer_id = PeerID::random(); +/// +/// // Create the configuration for the backend. +/// let mut config = Config::default(); +/// +/// // Create a new Backend +/// let backend = Backend::new(peer_id, config); +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let task = async { +/// // Run the backend +/// backend.run(ex.clone()).await.unwrap(); +/// +/// // .... +/// +/// // Shutdown the backend +/// backend.shutdown().await; +/// }; +/// +/// future::block_on(ex.run(task)); +/// +/// ``` +pub struct Backend { + /// The Configuration for the P2P network. + config: Arc, + + /// Peer ID. + id: PeerID, + + /// Responsible for network and system monitoring. + monitor: Arc, + + /// Discovery instance. + discovery: ArcDiscovery, + + /// PeerPool instance. + peer_pool: Arc, +} + +impl Backend { + /// Creates a new Backend. + pub fn new(id: PeerID, config: Config) -> ArcBackend { + let config = Arc::new(config); + let monitor = Arc::new(Monitor::new()); + + let conn_queue = ConnQueue::new(); + + let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone()); + let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone()); + + Arc::new(Self { + id: id.clone(), + monitor, + discovery, + config, + peer_pool, + }) + } + + /// Run the Backend, starting the PeerPool and Discovery instances. + pub async fn run(self: &Arc, ex: Executor<'_>) -> Result<()> { + info!("Run the backend {}", self.id); + self.peer_pool.start(ex.clone()).await?; + self.discovery.start(ex.clone()).await?; + Ok(()) + } + + /// Attach a custom protocol to the network + pub async fn attach_protocol( + &self, + c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static, + ) -> Result<()> { + self.peer_pool.attach_protocol::

(Box::new(c)).await + } + + /// Returns the number of currently connected peers. + pub async fn peers(&self) -> usize { + self.peer_pool.peers_len().await + } + + /// Returns the `Config`. + pub fn config(&self) -> Arc { + self.config.clone() + } + + /// Returns the number of occupied inbound slots. + pub fn inbound_slots(&self) -> usize { + self.discovery.inbound_slots.load() + } + + /// Returns the number of occupied outbound slots. + pub fn outbound_slots(&self) -> usize { + self.discovery.outbound_slots.load() + } + + /// Subscribes to the monitor to receive network events. + pub async fn monitor(&self) -> Subscription { + self.monitor.subscribe().await + } + + /// Shuts down the Backend. + pub async fn shutdown(&self) { + self.discovery.shutdown().await; + self.peer_pool.shutdown().await; + } +} -- cgit v1.2.3