aboutsummaryrefslogtreecommitdiff
path: root/net/src/transports/udp.rs
blob: 950537cc30dac67b158ab97b7a07c6a89d93caa7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::net::SocketAddr;

use async_trait::async_trait;
use karyon_core::async_runtime::net::UdpSocket;

use crate::{
    codec::Codec,
    connection::{Conn, Connection, ToConn},
    endpoint::Endpoint,
    Error, Result,
};

const BUFFER_SIZE: usize = 64 * 1024;

/// UDP configuration
#[derive(Default)]
pub struct UdpConfig {}

/// UDP network connection implementation of the [`Connection`] trait.
#[allow(dead_code)]
pub struct UdpConn<C> {
    inner: UdpSocket,
    codec: C,
    config: UdpConfig,
}

impl<C> UdpConn<C>
where
    C: Codec + Clone,
{
    /// Creates a new UdpConn
    fn new(socket: UdpSocket, config: UdpConfig, codec: C) -> Self {
        Self {
            inner: socket,
            codec,
            config,
        }
    }
}

#[async_trait]
impl<C> Connection for UdpConn<C>
where
    C: Codec + Clone,
{
    type Item = (C::Item, Endpoint);
    fn peer_endpoint(&self) -> Result<Endpoint> {
        self.inner
            .peer_addr()
            .map(Endpoint::new_udp_addr)
            .map_err(Error::from)
    }

    fn local_endpoint(&self) -> Result<Endpoint> {
        self.inner
            .local_addr()
            .map(Endpoint::new_udp_addr)
            .map_err(Error::from)
    }

    async fn recv(&self) -> Result<Self::Item> {
        let mut buf = [0u8; BUFFER_SIZE];
        let (_, addr) = self.inner.recv_from(&mut buf).await?;
        match self.codec.decode(&mut buf)? {
            Some((_, item)) => Ok((item, Endpoint::new_udp_addr(addr))),
            None => Err(Error::Decode("Unable to decode".into())),
        }
    }

    async fn send(&self, msg: Self::Item) -> Result<()> {
        let (msg, out_addr) = msg;
        let mut buf = [0u8; BUFFER_SIZE];
        self.codec.encode(&msg, &mut buf)?;
        let addr: SocketAddr = out_addr.try_into()?;
        self.inner.send_to(&buf, addr).await?;
        Ok(())
    }
}

/// Connects to the given UDP address and port.
pub async fn dial<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>>
where
    C: Codec + Clone,
{
    let addr = SocketAddr::try_from(endpoint.clone())?;

    // Let the operating system assign an available port to this socket
    let conn = UdpSocket::bind("[::]:0").await?;
    conn.connect(addr).await?;
    Ok(UdpConn::new(conn, config, codec))
}

/// Listens on the given UDP address and port.
pub async fn listen<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>>
where
    C: Codec + Clone,
{
    let addr = SocketAddr::try_from(endpoint.clone())?;
    let conn = UdpSocket::bind(addr).await?;
    Ok(UdpConn::new(conn, config, codec))
}

impl<C> ToConn for UdpConn<C>
where
    C: Codec + Clone,
{
    type Item = (C::Item, Endpoint);
    fn to_conn(self) -> Conn<Self::Item> {
        Box::new(self)
    }
}