aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/backend.rs
blob: 16cc20bb7543de6fc09f702d98b3d7e6ded6a767 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::sync::Arc;

use log::info;

use karyon_core::{async_runtime::Executor, crypto::KeyPair};

use crate::{
    config::Config,
    conn_queue::ConnQueue,
    discovery::{ArcDiscovery, Discovery},
    monitor::Monitor,
    peer_pool::PeerPool,
    protocol::{ArcProtocol, Protocol},
    ArcPeer, PeerID, Result,
};

pub type ArcBackend = Arc<Backend>;

/// Backend serves as the central entry point for initiating and managing
/// the P2P network.
pub struct Backend {
    /// The Configuration for the P2P network.
    config: Arc<Config>,

    /// Identity Key pair
    key_pair: KeyPair,

    /// Peer ID
    peer_id: PeerID,

    /// Responsible for network and system monitoring.
    monitor: Arc<Monitor>,

    /// Discovery instance.
    discovery: ArcDiscovery,

    /// PeerPool instance.
    peer_pool: Arc<PeerPool>,
}

impl Backend {
    /// Creates a new Backend.
    pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> ArcBackend {
        let config = Arc::new(config);
        let monitor = Arc::new(Monitor::new(config.clone()));
        let conn_queue = ConnQueue::new();

        let peer_id = PeerID::try_from(key_pair.public())
            .expect("Derive a peer id from the provided key pair.");
        info!("PeerID: {}", peer_id);

        let peer_pool = PeerPool::new(
            &peer_id,
            conn_queue.clone(),
            config.clone(),
            monitor.clone(),
            ex.clone(),
        );

        let discovery = Discovery::new(
            key_pair,
            &peer_id,
            conn_queue,
            config.clone(),
            monitor.clone(),
            ex,
        );

        Arc::new(Self {
            key_pair: key_pair.clone(),
            peer_id,
            monitor,
            discovery,
            config,
            peer_pool,
        })
    }

    /// Run the Backend, starting the PeerPool and Discovery instances.
    pub async fn run(self: &Arc<Self>) -> Result<()> {
        self.peer_pool.start().await?;
        self.discovery.start().await?;
        Ok(())
    }

    /// Attach a custom protocol to the network
    pub async fn attach_protocol<P: Protocol>(
        &self,
        c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static,
    ) -> Result<()> {
        self.peer_pool.attach_protocol::<P>(Box::new(c)).await
    }

    /// Returns the number of currently connected peers.
    pub async fn peers(&self) -> usize {
        self.peer_pool.peers_len().await
    }

    /// Returns the `Config`.
    pub fn config(&self) -> Arc<Config> {
        self.config.clone()
    }

    /// Returns the `PeerID`.
    pub fn peer_id(&self) -> &PeerID {
        &self.peer_id
    }

    /// Returns the `KeyPair`.
    pub fn key_pair(&self) -> &KeyPair {
        &self.key_pair
    }

    /// Returns the number of occupied inbound slots.
    pub fn inbound_slots(&self) -> usize {
        self.discovery.inbound_slots.load()
    }

    /// Returns the number of occupied outbound slots.
    pub fn outbound_slots(&self) -> usize {
        self.discovery.outbound_slots.load()
    }

    /// Returns the monitor to receive system events.
    pub fn monitor(&self) -> Arc<Monitor> {
        self.monitor.clone()
    }

    /// Shuts down the Backend.
    pub async fn shutdown(&self) {
        self.discovery.shutdown().await;
        self.peer_pool.shutdown().await;
    }
}