aboutsummaryrefslogtreecommitdiff
path: root/net/src/transports/udp.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-04-11 10:19:20 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-19 13:51:30 +0200
commit0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch)
tree961d73218af672797d49f899289bef295bc56493 /net/src/transports/udp.rs
parenta69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff)
add support for tokio & improve net crate api
Diffstat (limited to 'net/src/transports/udp.rs')
-rw-r--r--net/src/transports/udp.rs114
1 files changed, 66 insertions, 48 deletions
diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs
index bd1fe83..950537c 100644
--- a/net/src/transports/udp.rs
+++ b/net/src/transports/udp.rs
@@ -1,93 +1,111 @@
use std::net::SocketAddr;
use async_trait::async_trait;
-use smol::net::UdpSocket;
+use karyon_core::async_runtime::net::UdpSocket;
use crate::{
- connection::{Connection, ToConn},
+ codec::Codec,
+ connection::{Conn, Connection, ToConn},
endpoint::Endpoint,
Error, Result,
};
+const BUFFER_SIZE: usize = 64 * 1024;
+
+/// UDP configuration
+#[derive(Default)]
+pub struct UdpConfig {}
+
/// UDP network connection implementation of the [`Connection`] trait.
-pub struct UdpConn {
+#[allow(dead_code)]
+pub struct UdpConn<C> {
inner: UdpSocket,
+ codec: C,
+ config: UdpConfig,
}
-impl UdpConn {
+impl<C> UdpConn<C>
+where
+ C: Codec + Clone,
+{
/// Creates a new UdpConn
- pub fn new(conn: UdpSocket) -> Self {
- Self { inner: conn }
- }
-}
-
-impl UdpConn {
- /// Receives a single datagram message. Returns the number of bytes read
- /// and the origin endpoint.
- pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> {
- let (size, addr) = self.inner.recv_from(buf).await?;
- Ok((size, Endpoint::new_udp_addr(&addr)))
- }
-
- /// Sends data to the given address. Returns the number of bytes written.
- pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> {
- let addr: SocketAddr = addr.clone().try_into()?;
- let size = self.inner.send_to(buf, addr).await?;
- Ok(size)
+ fn new(socket: UdpSocket, config: UdpConfig, codec: C) -> Self {
+ Self {
+ inner: socket,
+ codec,
+ config,
+ }
}
}
#[async_trait]
-impl Connection for UdpConn {
+impl<C> Connection for UdpConn<C>
+where
+ C: Codec + Clone,
+{
+ type Item = (C::Item, Endpoint);
fn peer_endpoint(&self) -> Result<Endpoint> {
- Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?))
+ self.inner
+ .peer_addr()
+ .map(Endpoint::new_udp_addr)
+ .map_err(Error::from)
}
fn local_endpoint(&self) -> Result<Endpoint> {
- Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?))
+ self.inner
+ .local_addr()
+ .map(Endpoint::new_udp_addr)
+ .map_err(Error::from)
}
- async fn read(&self, buf: &mut [u8]) -> Result<usize> {
- self.inner.recv(buf).await.map_err(Error::from)
+ async fn recv(&self) -> Result<Self::Item> {
+ let mut buf = [0u8; BUFFER_SIZE];
+ let (_, addr) = self.inner.recv_from(&mut buf).await?;
+ match self.codec.decode(&mut buf)? {
+ Some((_, item)) => Ok((item, Endpoint::new_udp_addr(addr))),
+ None => Err(Error::Decode("Unable to decode".into())),
+ }
}
- async fn write(&self, buf: &[u8]) -> Result<usize> {
- self.inner.send(buf).await.map_err(Error::from)
+ async fn send(&self, msg: Self::Item) -> Result<()> {
+ let (msg, out_addr) = msg;
+ let mut buf = [0u8; BUFFER_SIZE];
+ self.codec.encode(&msg, &mut buf)?;
+ let addr: SocketAddr = out_addr.try_into()?;
+ self.inner.send_to(&buf, addr).await?;
+ Ok(())
}
}
/// Connects to the given UDP address and port.
-pub async fn dial(endpoint: &Endpoint) -> Result<UdpConn> {
+pub async fn dial<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>>
+where
+ C: Codec + Clone,
+{
let addr = SocketAddr::try_from(endpoint.clone())?;
// Let the operating system assign an available port to this socket
let conn = UdpSocket::bind("[::]:0").await?;
conn.connect(addr).await?;
- Ok(UdpConn::new(conn))
+ Ok(UdpConn::new(conn, config, codec))
}
/// Listens on the given UDP address and port.
-pub async fn listen(endpoint: &Endpoint) -> Result<UdpConn> {
+pub async fn listen<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>>
+where
+ C: Codec + Clone,
+{
let addr = SocketAddr::try_from(endpoint.clone())?;
let conn = UdpSocket::bind(addr).await?;
- let udp_conn = UdpConn::new(conn);
- Ok(udp_conn)
-}
-
-impl From<UdpSocket> for Box<dyn Connection> {
- fn from(conn: UdpSocket) -> Self {
- Box::new(UdpConn::new(conn))
- }
-}
-
-impl ToConn for UdpSocket {
- fn to_conn(self) -> Box<dyn Connection> {
- self.into()
- }
+ Ok(UdpConn::new(conn, config, codec))
}
-impl ToConn for UdpConn {
- fn to_conn(self) -> Box<dyn Connection> {
+impl<C> ToConn for UdpConn<C>
+where
+ C: Codec + Clone,
+{
+ type Item = (C::Item, Endpoint);
+ fn to_conn(self) -> Conn<Self::Item> {
Box::new(self)
}
}