diff options
author | hozan23 <hozan23@proton.me> | 2023-11-08 13:03:27 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-08 13:03:27 +0300 |
commit | 4fe665fc8bc6265baf5bfba6b6a5f3ee2dba63dc (patch) | |
tree | 77c7c40c9725539546e53b00f424deafe5ec81a8 /karyons_p2p/src/backend.rs |
first commit
Diffstat (limited to 'karyons_p2p/src/backend.rs')
-rw-r--r-- | karyons_p2p/src/backend.rs | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/karyons_p2p/src/backend.rs b/karyons_p2p/src/backend.rs new file mode 100644 index 0000000..290e3e7 --- /dev/null +++ b/karyons_p2p/src/backend.rs @@ -0,0 +1,139 @@ +use std::sync::Arc; + +use log::info; + +use karyons_core::{pubsub::Subscription, Executor}; + +use crate::{ + config::Config, + discovery::{ArcDiscovery, Discovery}, + monitor::{Monitor, MonitorEvent}, + net::ConnQueue, + peer_pool::PeerPool, + protocol::{ArcProtocol, Protocol}, + ArcPeer, PeerID, Result, +}; + +pub type ArcBackend = Arc<Backend>; + +/// Backend serves as the central entry point for initiating and managing +/// the P2P network. +/// +/// +/// # Example +/// ``` +/// use std::sync::Arc; +/// +/// use easy_parallel::Parallel; +/// use smol::{channel as smol_channel, future, Executor}; +/// +/// use karyons_p2p::{Backend, Config, PeerID}; +/// +/// let peer_id = PeerID::random(); +/// +/// // Create the configuration for the backend. +/// let mut config = Config::default(); +/// +/// // Create a new Backend +/// let backend = Backend::new(peer_id, config); +/// +/// // Create a new Executor +/// let ex = Arc::new(Executor::new()); +/// +/// let task = async { +/// // Run the backend +/// backend.run(ex.clone()).await.unwrap(); +/// +/// // .... +/// +/// // Shutdown the backend +/// backend.shutdown().await; +/// }; +/// +/// future::block_on(ex.run(task)); +/// +/// ``` +pub struct Backend { + /// The Configuration for the P2P network. + config: Arc<Config>, + + /// Peer ID. + id: PeerID, + + /// Responsible for network and system monitoring. + monitor: Arc<Monitor>, + + /// Discovery instance. + discovery: ArcDiscovery, + + /// PeerPool instance. + peer_pool: Arc<PeerPool>, +} + +impl Backend { + /// Creates a new Backend. + pub fn new(id: PeerID, config: Config) -> ArcBackend { + let config = Arc::new(config); + let monitor = Arc::new(Monitor::new()); + + let conn_queue = ConnQueue::new(); + + let peer_pool = PeerPool::new(&id, conn_queue.clone(), config.clone(), monitor.clone()); + let discovery = Discovery::new(&id, conn_queue, config.clone(), monitor.clone()); + + Arc::new(Self { + id: id.clone(), + monitor, + discovery, + config, + peer_pool, + }) + } + + /// Run the Backend, starting the PeerPool and Discovery instances. + pub async fn run(self: &Arc<Self>, ex: Executor<'_>) -> Result<()> { + info!("Run the backend {}", self.id); + self.peer_pool.start(ex.clone()).await?; + self.discovery.start(ex.clone()).await?; + Ok(()) + } + + /// Attach a custom protocol to the network + pub async fn attach_protocol<P: Protocol>( + &self, + c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static, + ) -> Result<()> { + self.peer_pool.attach_protocol::<P>(Box::new(c)).await + } + + /// Returns the number of currently connected peers. + pub async fn peers(&self) -> usize { + self.peer_pool.peers_len().await + } + + /// Returns the `Config`. + pub fn config(&self) -> Arc<Config> { + self.config.clone() + } + + /// Returns the number of occupied inbound slots. + pub fn inbound_slots(&self) -> usize { + self.discovery.inbound_slots.load() + } + + /// Returns the number of occupied outbound slots. + pub fn outbound_slots(&self) -> usize { + self.discovery.outbound_slots.load() + } + + /// Subscribes to the monitor to receive network events. + pub async fn monitor(&self) -> Subscription<MonitorEvent> { + self.monitor.subscribe().await + } + + /// Shuts down the Backend. + pub async fn shutdown(&self) { + self.discovery.shutdown().await; + self.peer_pool.shutdown().await; + } +} |