aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/backend.rs
blob: bb0d891b28e84779b4c806850065bc41fce418c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::sync::Arc;

use log::info;

use karyons_core::{pubsub::Subscription, GlobalExecutor};

use crate::{
    config::Config,
    connection::ConnQueue,
    discovery::{ArcDiscovery, Discovery},
    monitor::{Monitor, MonitorEvent},
    peer_pool::PeerPool,
    protocol::{ArcProtocol, Protocol},
    ArcPeer, PeerID, Result,
};

pub type ArcBackend = Arc<Backend>;

/// Backend serves as the central entry point for initiating and managing
/// the P2P network.
///
///
/// # Example
/// ```
/// use std::sync::Arc;
///
/// use easy_parallel::Parallel;
/// use smol::{channel as smol_channel, future, Executor};
///
/// use karyons_p2p::{Backend, Config, PeerID};
///
/// let peer_id = PeerID::random();
///
/// // Create the configuration for the backend.
/// let mut config = Config::default();
///
///
/// // Create a new Executor
/// let ex = Arc::new(Executor::new());
///
/// // Create a new Backend
/// let backend = Backend::new(peer_id, config, ex.clone());
///
/// let task = async {
///     // Run the backend
///     backend.run().await.unwrap();
///
///     // ....
///
///     // Shutdown the backend
///     backend.shutdown().await;
/// };
///
/// future::block_on(ex.run(task));
///
/// ```
pub struct Backend {
    /// The Configuration for the P2P network.
    config: Arc<Config>,

    /// Peer ID.
    id: PeerID,

    /// Responsible for network and system monitoring.
    monitor: Arc<Monitor>,

    /// Discovery instance.
    discovery: ArcDiscovery,

    /// PeerPool instance.
    peer_pool: Arc<PeerPool>,
}

impl Backend {
    /// Creates a new Backend.
    pub fn new(id: PeerID, config: Config, ex: GlobalExecutor) -> ArcBackend {
        let config = Arc::new(config);
        let monitor = Arc::new(Monitor::new());
        let cq = ConnQueue::new();

        let peer_pool = PeerPool::new(&id, cq.clone(), config.clone(), monitor.clone(), ex.clone());

        let discovery = Discovery::new(&id, cq, config.clone(), monitor.clone(), ex);

        Arc::new(Self {
            id: id.clone(),
            monitor,
            discovery,
            config,
            peer_pool,
        })
    }

    /// Run the Backend, starting the PeerPool and Discovery instances.
    pub async fn run(self: &Arc<Self>) -> Result<()> {
        info!("Run the backend {}", self.id);
        self.peer_pool.start().await?;
        self.discovery.start().await?;
        Ok(())
    }

    /// Attach a custom protocol to the network
    pub async fn attach_protocol<P: Protocol>(
        &self,
        c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static,
    ) -> Result<()> {
        self.peer_pool.attach_protocol::<P>(Box::new(c)).await
    }

    /// Returns the number of currently connected peers.
    pub async fn peers(&self) -> usize {
        self.peer_pool.peers_len().await
    }

    /// Returns the `Config`.
    pub fn config(&self) -> Arc<Config> {
        self.config.clone()
    }

    /// Returns the number of occupied inbound slots.
    pub fn inbound_slots(&self) -> usize {
        self.discovery.inbound_slots.load()
    }

    /// Returns the number of occupied outbound slots.
    pub fn outbound_slots(&self) -> usize {
        self.discovery.outbound_slots.load()
    }

    /// Subscribes to the monitor to receive network events.
    pub async fn monitor(&self) -> Subscription<MonitorEvent> {
        self.monitor.subscribe().await
    }

    /// Shuts down the Backend.
    pub async fn shutdown(&self) {
        self.discovery.shutdown().await;
        self.peer_pool.shutdown().await;
    }
}