diff options
Diffstat (limited to 'net/src/transports')
-rw-r--r-- | net/src/transports/mod.rs | 3 | ||||
-rw-r--r-- | net/src/transports/tcp.rs | 82 | ||||
-rw-r--r-- | net/src/transports/udp.rs | 77 | ||||
-rw-r--r-- | net/src/transports/unix.rs | 73 |
4 files changed, 235 insertions, 0 deletions
diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs new file mode 100644 index 0000000..f399133 --- /dev/null +++ b/net/src/transports/mod.rs @@ -0,0 +1,3 @@ +pub mod tcp; +pub mod udp; +pub mod unix; diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs new file mode 100644 index 0000000..5ff7b28 --- /dev/null +++ b/net/src/transports/tcp.rs @@ -0,0 +1,82 @@ +use async_trait::async_trait; + +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::{TcpListener, TcpStream}, +}; + +use crate::{ + connection::Connection, + endpoint::{Addr, Endpoint, Port}, + listener::Listener, + Result, +}; + +/// TCP network connection implementations of the `Connection` trait. +pub struct TcpConn { + inner: TcpStream, + read: Mutex<ReadHalf<TcpStream>>, + write: Mutex<WriteHalf<TcpStream>>, +} + +impl TcpConn { + /// Creates a new TcpConn + pub fn new(conn: TcpStream) -> Self { + let (read, write) = split(conn.clone()); + Self { + inner: conn, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for TcpConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + self.read.lock().await.read_exact(buf).await?; + Ok(buf.len()) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + self.write.lock().await.write_all(buf).await?; + Ok(buf.len()) + } +} + +#[async_trait] +impl Listener for TcpListener { + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.local_addr()?)) + } + + async fn accept(&self) -> Result<Box<dyn Connection>> { + let (conn, _) = self.accept().await?; + conn.set_nodelay(true)?; + Ok(Box::new(TcpConn::new(conn))) + } +} + +/// Connects to the given TCP address and port. +pub async fn dial_tcp(addr: &Addr, port: &Port) -> Result<TcpConn> { + let address = format!("{}:{}", addr, port); + let conn = TcpStream::connect(address).await?; + conn.set_nodelay(true)?; + Ok(TcpConn::new(conn)) +} + +/// Listens on the given TCP address and port. +pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result<TcpListener> { + let address = format!("{}:{}", addr, port); + let listener = TcpListener::bind(address).await?; + Ok(listener) +} diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs new file mode 100644 index 0000000..27fb9ae --- /dev/null +++ b/net/src/transports/udp.rs @@ -0,0 +1,77 @@ +use std::net::SocketAddr; + +use async_trait::async_trait; +use smol::net::UdpSocket; + +use crate::{ + connection::Connection, + endpoint::{Addr, Endpoint, Port}, + Result, +}; + +/// UDP network connection implementations of the `Connection` trait. +pub struct UdpConn { + inner: UdpSocket, +} + +impl UdpConn { + /// Creates a new UdpConn + pub fn new(conn: UdpSocket) -> Self { + Self { inner: conn } + } +} + +impl UdpConn { + /// Receives a single datagram message. Returns the number of bytes read + /// and the origin endpoint. + pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { + let (size, addr) = self.inner.recv_from(buf).await?; + Ok((size, Endpoint::new_udp_addr(&addr))) + } + + /// Sends data to the given address. Returns the number of bytes written. + pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> { + let addr: SocketAddr = addr.clone().try_into()?; + let size = self.inner.send_to(buf, addr).await?; + Ok(size) + } +} + +#[async_trait] +impl Connection for UdpConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + let size = self.inner.recv(buf).await?; + Ok(size) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + let size = self.inner.send(buf).await?; + Ok(size) + } +} + +/// Connects to the given UDP address and port. +pub async fn dial_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { + let address = format!("{}:{}", addr, port); + + // Let the operating system assign an available port to this socket + let conn = UdpSocket::bind("[::]:0").await?; + conn.connect(address).await?; + Ok(UdpConn::new(conn)) +} + +/// Listens on the given UDP address and port. +pub async fn listen_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { + let address = format!("{}:{}", addr, port); + let conn = UdpSocket::bind(address).await?; + let udp_conn = UdpConn::new(conn); + Ok(udp_conn) +} diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs new file mode 100644 index 0000000..c89832e --- /dev/null +++ b/net/src/transports/unix.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; + +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::unix::{UnixListener, UnixStream}, +}; + +use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Result}; + +/// Unix domain socket implementations of the `Connection` trait. +pub struct UnixConn { + inner: UnixStream, + read: Mutex<ReadHalf<UnixStream>>, + write: Mutex<WriteHalf<UnixStream>>, +} + +impl UnixConn { + /// Creates a new UnixConn + pub fn new(conn: UnixStream) -> Self { + let (read, write) = split(conn.clone()); + Self { + inner: conn, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for UnixConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + self.read.lock().await.read_exact(buf).await?; + Ok(buf.len()) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + self.write.lock().await.write_all(buf).await?; + Ok(buf.len()) + } +} + +#[async_trait] +impl Listener for UnixListener { + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.local_addr()?)) + } + + async fn accept(&self) -> Result<Box<dyn Connection>> { + let (conn, _) = self.accept().await?; + Ok(Box::new(UnixConn::new(conn))) + } +} + +/// Connects to the given Unix socket path. +pub async fn dial_unix(path: &String) -> Result<UnixConn> { + let conn = UnixStream::connect(path).await?; + Ok(UnixConn::new(conn)) +} + +/// Listens on the given Unix socket path. +pub fn listen_unix(path: &String) -> Result<UnixListener> { + let listener = UnixListener::bind(path)?; + Ok(listener) +} |