diff options
Diffstat (limited to 'net/src/transports/udp.rs')
| -rw-r--r-- | net/src/transports/udp.rs | 114 | 
1 files changed, 66 insertions, 48 deletions
| diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs index bd1fe83..950537c 100644 --- a/net/src/transports/udp.rs +++ b/net/src/transports/udp.rs @@ -1,93 +1,111 @@  use std::net::SocketAddr;  use async_trait::async_trait; -use smol::net::UdpSocket; +use karyon_core::async_runtime::net::UdpSocket;  use crate::{ -    connection::{Connection, ToConn}, +    codec::Codec, +    connection::{Conn, Connection, ToConn},      endpoint::Endpoint,      Error, Result,  }; +const BUFFER_SIZE: usize = 64 * 1024; + +/// UDP configuration +#[derive(Default)] +pub struct UdpConfig {} +  /// UDP network connection implementation of the [`Connection`] trait. -pub struct UdpConn { +#[allow(dead_code)] +pub struct UdpConn<C> {      inner: UdpSocket, +    codec: C, +    config: UdpConfig,  } -impl UdpConn { +impl<C> UdpConn<C> +where +    C: Codec + Clone, +{      /// Creates a new UdpConn -    pub fn new(conn: UdpSocket) -> Self { -        Self { inner: conn } -    } -} - -impl UdpConn { -    /// Receives a single datagram message. Returns the number of bytes read -    /// and the origin endpoint. -    pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { -        let (size, addr) = self.inner.recv_from(buf).await?; -        Ok((size, Endpoint::new_udp_addr(&addr))) -    } - -    /// Sends data to the given address. Returns the number of bytes written. -    pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> { -        let addr: SocketAddr = addr.clone().try_into()?; -        let size = self.inner.send_to(buf, addr).await?; -        Ok(size) +    fn new(socket: UdpSocket, config: UdpConfig, codec: C) -> Self { +        Self { +            inner: socket, +            codec, +            config, +        }      }  }  #[async_trait] -impl Connection for UdpConn { +impl<C> Connection for UdpConn<C> +where +    C: Codec + Clone, +{ +    type Item = (C::Item, Endpoint);      fn peer_endpoint(&self) -> Result<Endpoint> { -        Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) +        self.inner +            .peer_addr() +            .map(Endpoint::new_udp_addr) +            .map_err(Error::from)      }      fn local_endpoint(&self) -> Result<Endpoint> { -        Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) +        self.inner +            .local_addr() +            .map(Endpoint::new_udp_addr) +            .map_err(Error::from)      } -    async fn read(&self, buf: &mut [u8]) -> Result<usize> { -        self.inner.recv(buf).await.map_err(Error::from) +    async fn recv(&self) -> Result<Self::Item> { +        let mut buf = [0u8; BUFFER_SIZE]; +        let (_, addr) = self.inner.recv_from(&mut buf).await?; +        match self.codec.decode(&mut buf)? { +            Some((_, item)) => Ok((item, Endpoint::new_udp_addr(addr))), +            None => Err(Error::Decode("Unable to decode".into())), +        }      } -    async fn write(&self, buf: &[u8]) -> Result<usize> { -        self.inner.send(buf).await.map_err(Error::from) +    async fn send(&self, msg: Self::Item) -> Result<()> { +        let (msg, out_addr) = msg; +        let mut buf = [0u8; BUFFER_SIZE]; +        self.codec.encode(&msg, &mut buf)?; +        let addr: SocketAddr = out_addr.try_into()?; +        self.inner.send_to(&buf, addr).await?; +        Ok(())      }  }  /// Connects to the given UDP address and port. -pub async fn dial(endpoint: &Endpoint) -> Result<UdpConn> { +pub async fn dial<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>> +where +    C: Codec + Clone, +{      let addr = SocketAddr::try_from(endpoint.clone())?;      // Let the operating system assign an available port to this socket      let conn = UdpSocket::bind("[::]:0").await?;      conn.connect(addr).await?; -    Ok(UdpConn::new(conn)) +    Ok(UdpConn::new(conn, config, codec))  }  /// Listens on the given UDP address and port. -pub async fn listen(endpoint: &Endpoint) -> Result<UdpConn> { +pub async fn listen<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>> +where +    C: Codec + Clone, +{      let addr = SocketAddr::try_from(endpoint.clone())?;      let conn = UdpSocket::bind(addr).await?; -    let udp_conn = UdpConn::new(conn); -    Ok(udp_conn) -} - -impl From<UdpSocket> for Box<dyn Connection> { -    fn from(conn: UdpSocket) -> Self { -        Box::new(UdpConn::new(conn)) -    } -} - -impl ToConn for UdpSocket { -    fn to_conn(self) -> Box<dyn Connection> { -        self.into() -    } +    Ok(UdpConn::new(conn, config, codec))  } -impl ToConn for UdpConn { -    fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for UdpConn<C> +where +    C: Codec + Clone, +{ +    type Item = (C::Item, Endpoint); +    fn to_conn(self) -> Conn<Self::Item> {          Box::new(self)      }  } | 
