From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- net/src/transports/udp.rs | 114 +++++++++++++++++++++++++++------------------- 1 file changed, 66 insertions(+), 48 deletions(-) (limited to 'net/src/transports/udp.rs') diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs index bd1fe83..950537c 100644 --- a/net/src/transports/udp.rs +++ b/net/src/transports/udp.rs @@ -1,93 +1,111 @@ use std::net::SocketAddr; use async_trait::async_trait; -use smol::net::UdpSocket; +use karyon_core::async_runtime::net::UdpSocket; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, Error, Result, }; +const BUFFER_SIZE: usize = 64 * 1024; + +/// UDP configuration +#[derive(Default)] +pub struct UdpConfig {} + /// UDP network connection implementation of the [`Connection`] trait. -pub struct UdpConn { +#[allow(dead_code)] +pub struct UdpConn { inner: UdpSocket, + codec: C, + config: UdpConfig, } -impl UdpConn { +impl UdpConn +where + C: Codec + Clone, +{ /// Creates a new UdpConn - pub fn new(conn: UdpSocket) -> Self { - Self { inner: conn } - } -} - -impl UdpConn { - /// Receives a single datagram message. Returns the number of bytes read - /// and the origin endpoint. - pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { - let (size, addr) = self.inner.recv_from(buf).await?; - Ok((size, Endpoint::new_udp_addr(&addr))) - } - - /// Sends data to the given address. Returns the number of bytes written. - pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result { - let addr: SocketAddr = addr.clone().try_into()?; - let size = self.inner.send_to(buf, addr).await?; - Ok(size) + fn new(socket: UdpSocket, config: UdpConfig, codec: C) -> Self { + Self { + inner: socket, + codec, + config, + } } } #[async_trait] -impl Connection for UdpConn { +impl Connection for UdpConn +where + C: Codec + Clone, +{ + type Item = (C::Item, Endpoint); fn peer_endpoint(&self) -> Result { - Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) + self.inner + .peer_addr() + .map(Endpoint::new_udp_addr) + .map_err(Error::from) } fn local_endpoint(&self) -> Result { - Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) + self.inner + .local_addr() + .map(Endpoint::new_udp_addr) + .map_err(Error::from) } - async fn read(&self, buf: &mut [u8]) -> Result { - self.inner.recv(buf).await.map_err(Error::from) + async fn recv(&self) -> Result { + let mut buf = [0u8; BUFFER_SIZE]; + let (_, addr) = self.inner.recv_from(&mut buf).await?; + match self.codec.decode(&mut buf)? { + Some((_, item)) => Ok((item, Endpoint::new_udp_addr(addr))), + None => Err(Error::Decode("Unable to decode".into())), + } } - async fn write(&self, buf: &[u8]) -> Result { - self.inner.send(buf).await.map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + let (msg, out_addr) = msg; + let mut buf = [0u8; BUFFER_SIZE]; + self.codec.encode(&msg, &mut buf)?; + let addr: SocketAddr = out_addr.try_into()?; + self.inner.send_to(&buf, addr).await?; + Ok(()) } } /// Connects to the given UDP address and port. -pub async fn dial(endpoint: &Endpoint) -> Result { +pub async fn dial(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; // Let the operating system assign an available port to this socket let conn = UdpSocket::bind("[::]:0").await?; conn.connect(addr).await?; - Ok(UdpConn::new(conn)) + Ok(UdpConn::new(conn, config, codec)) } /// Listens on the given UDP address and port. -pub async fn listen(endpoint: &Endpoint) -> Result { +pub async fn listen(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; let conn = UdpSocket::bind(addr).await?; - let udp_conn = UdpConn::new(conn); - Ok(udp_conn) -} - -impl From for Box { - fn from(conn: UdpSocket) -> Self { - Box::new(UdpConn::new(conn)) - } -} - -impl ToConn for UdpSocket { - fn to_conn(self) -> Box { - self.into() - } + Ok(UdpConn::new(conn, config, codec)) } -impl ToConn for UdpConn { - fn to_conn(self) -> Box { +impl ToConn for UdpConn +where + C: Codec + Clone, +{ + type Item = (C::Item, Endpoint); + fn to_conn(self) -> Conn { Box::new(self) } } -- cgit v1.2.3