aboutsummaryrefslogtreecommitdiff
path: root/p2p/src/connector.rs
blob: f41ab57d5d96b2d5a155ee911d81e2da1387e5da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::{future::Future, sync::Arc};

use log::{trace, warn};

use karyons_core::{
    async_utils::{Backoff, TaskGroup, TaskResult},
    GlobalExecutor,
};
use karyons_net::{dial, Conn, Endpoint, NetError};

use crate::{
    monitor::{ConnEvent, Monitor},
    slots::ConnectionSlots,
    Result,
};

/// Responsible for creating outbound connections with other peers.
pub struct Connector {
    /// Managing spawned tasks.
    task_group: TaskGroup<'static>,

    /// Manages available outbound slots.
    connection_slots: Arc<ConnectionSlots>,

    /// The maximum number of retries allowed before successfully
    /// establishing a connection.
    max_retries: usize,

    /// Responsible for network and system monitoring.
    monitor: Arc<Monitor>,
}

impl Connector {
    /// Creates a new Connector
    pub fn new(
        max_retries: usize,
        connection_slots: Arc<ConnectionSlots>,
        monitor: Arc<Monitor>,
        ex: GlobalExecutor,
    ) -> Arc<Self> {
        Arc::new(Self {
            task_group: TaskGroup::new(ex),
            monitor,
            connection_slots,
            max_retries,
        })
    }

    /// Shuts down the connector
    pub async fn shutdown(&self) {
        self.task_group.cancel().await;
    }

    /// Establish a connection to the specified `endpoint`. If the connection
    /// attempt fails, it performs a backoff and retries until the maximum allowed
    /// number of retries is exceeded. On a successful connection, it returns a
    /// `Conn` instance.
    ///
    /// This method will block until it finds an available slot.
    pub async fn connect(&self, endpoint: &Endpoint) -> Result<Conn> {
        self.connection_slots.wait_for_slot().await;
        self.connection_slots.add();

        let mut retry = 0;
        let backoff = Backoff::new(500, 2000);
        while retry < self.max_retries {
            let conn_result = dial(endpoint).await;

            if let Ok(conn) = conn_result {
                self.monitor
                    .notify(&ConnEvent::Connected(endpoint.clone()).into())
                    .await;
                return Ok(conn);
            }

            self.monitor
                .notify(&ConnEvent::ConnectRetried(endpoint.clone()).into())
                .await;

            backoff.sleep().await;

            warn!("try to reconnect {endpoint}");
            retry += 1;
        }

        self.monitor
            .notify(&ConnEvent::ConnectFailed(endpoint.clone()).into())
            .await;

        self.connection_slots.remove().await;
        Err(NetError::Timeout.into())
    }

    /// Establish a connection to the given `endpoint`. For each new connection,
    /// it invokes the provided `callback`, and pass the connection to the callback.
    pub async fn connect_with_cback<Fut>(
        self: &Arc<Self>,
        endpoint: &Endpoint,
        callback: impl FnOnce(Conn) -> Fut + Send + 'static,
    ) -> Result<()>
    where
        Fut: Future<Output = Result<()>> + Send + 'static,
    {
        let conn = self.connect(endpoint).await?;

        let selfc = self.clone();
        let endpoint = endpoint.clone();
        let on_disconnect = |res| async move {
            if let TaskResult::Completed(Err(err)) = res {
                trace!("Outbound connection dropped: {err}");
            }
            selfc
                .monitor
                .notify(&ConnEvent::Disconnected(endpoint.clone()).into())
                .await;
            selfc.connection_slots.remove().await;
        };

        self.task_group.spawn(callback(conn), on_disconnect);

        Ok(())
    }
}