diff options
Diffstat (limited to 'karyons_net/src')
-rw-r--r-- | karyons_net/src/connection.rs | 57 | ||||
-rw-r--r-- | karyons_net/src/endpoint.rs | 223 | ||||
-rw-r--r-- | karyons_net/src/error.rs | 45 | ||||
-rw-r--r-- | karyons_net/src/lib.rs | 24 | ||||
-rw-r--r-- | karyons_net/src/listener.rs | 39 | ||||
-rw-r--r-- | karyons_net/src/transports/mod.rs | 3 | ||||
-rw-r--r-- | karyons_net/src/transports/tcp.rs | 82 | ||||
-rw-r--r-- | karyons_net/src/transports/udp.rs | 77 | ||||
-rw-r--r-- | karyons_net/src/transports/unix.rs | 73 |
9 files changed, 623 insertions, 0 deletions
diff --git a/karyons_net/src/connection.rs b/karyons_net/src/connection.rs new file mode 100644 index 0000000..518ccfd --- /dev/null +++ b/karyons_net/src/connection.rs @@ -0,0 +1,57 @@ +use crate::{Endpoint, Result}; +use async_trait::async_trait; + +use crate::transports::{tcp, udp, unix}; + +/// Alias for `Box<dyn Connection>` +pub type Conn = Box<dyn Connection>; + +/// Connection is a generic network connection interface for +/// `UdpConn`, `TcpConn`, and `UnixConn`. +/// +/// If you are familiar with the Go language, this is similar to the `Conn` +/// interface <https://pkg.go.dev/net#Conn> +#[async_trait] +pub trait Connection: Send + Sync { + /// Returns the remote peer endpoint of this connection + fn peer_endpoint(&self) -> Result<Endpoint>; + + /// Returns the local socket endpoint of this connection + fn local_endpoint(&self) -> Result<Endpoint>; + + /// Reads data from this connection. + async fn recv(&self, buf: &mut [u8]) -> Result<usize>; + + /// Sends data to this connection + async fn send(&self, buf: &[u8]) -> Result<usize>; +} + +/// Connects to the provided endpoint. +/// +/// it only supports `tcp4/6`, `udp4/6` and `unix`. +/// +/// #Example +/// +/// ``` +/// use karyons_net::{Endpoint, dial}; +/// +/// async { +/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); +/// +/// let conn = dial(&endpoint).await.unwrap(); +/// +/// conn.send(b"MSG").await.unwrap(); +/// +/// let mut buffer = [0;32]; +/// conn.recv(&mut buffer).await.unwrap(); +/// }; +/// +/// ``` +/// +pub async fn dial(endpoint: &Endpoint) -> Result<Conn> { + match endpoint { + Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::dial_tcp(addr, port).await?)), + Endpoint::Udp(addr, port) => Ok(Box::new(udp::dial_udp(addr, port).await?)), + Endpoint::Unix(addr) => Ok(Box::new(unix::dial_unix(addr).await?)), + } +} diff --git a/karyons_net/src/endpoint.rs b/karyons_net/src/endpoint.rs new file mode 100644 index 0000000..50dfe6b --- /dev/null +++ b/karyons_net/src/endpoint.rs @@ -0,0 +1,223 @@ +use std::{ + net::{IpAddr, SocketAddr}, + os::unix::net::SocketAddr as UnixSocketAddress, + path::PathBuf, + str::FromStr, +}; + +use bincode::{Decode, Encode}; +use url::Url; + +use crate::{Error, Result}; + +/// Port defined as a u16. +pub type Port = u16; + +/// Endpoint defines generic network endpoints for karyons. +/// +/// # Example +/// +/// ``` +/// use std::net::SocketAddr; +/// +/// use karyons_net::Endpoint; +/// +/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); +/// +/// let socketaddr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); +/// let endpoint = Endpoint::new_udp_addr(&socketaddr); +/// +/// ``` +/// +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Endpoint { + Udp(Addr, Port), + Tcp(Addr, Port), + Unix(String), +} + +impl std::fmt::Display for Endpoint { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Endpoint::Udp(ip, port) => { + write!(f, "udp://{}:{}", ip, port) + } + Endpoint::Tcp(ip, port) => { + write!(f, "tcp://{}:{}", ip, port) + } + Endpoint::Unix(path) => { + if path.is_empty() { + write!(f, "unix:/UNNAMED") + } else { + write!(f, "unix:/{}", path) + } + } + } + } +} + +impl TryFrom<Endpoint> for SocketAddr { + type Error = Error; + fn try_from(endpoint: Endpoint) -> std::result::Result<SocketAddr, Self::Error> { + match endpoint { + Endpoint::Udp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), + Endpoint::Tcp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), + Endpoint::Unix(_) => Err(Error::TryFromEndpointError), + } + } +} + +impl TryFrom<Endpoint> for PathBuf { + type Error = Error; + fn try_from(endpoint: Endpoint) -> std::result::Result<PathBuf, Self::Error> { + match endpoint { + Endpoint::Unix(path) => Ok(PathBuf::from(&path)), + _ => Err(Error::TryFromEndpointError), + } + } +} + +impl TryFrom<Endpoint> for UnixSocketAddress { + type Error = Error; + fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> { + match endpoint { + Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?), + _ => Err(Error::TryFromEndpointError), + } + } +} + +impl FromStr for Endpoint { + type Err = Error; + + fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { + let url: Url = match s.parse() { + Ok(u) => u, + Err(err) => return Err(Error::ParseEndpoint(err.to_string())), + }; + + if url.has_host() { + let host = url.host_str().unwrap(); + + let addr = match host.parse::<IpAddr>() { + Ok(addr) => Addr::Ip(addr), + Err(_) => Addr::Domain(host.to_string()), + }; + + let port = match url.port() { + Some(p) => p, + None => return Err(Error::ParseEndpoint(format!("port missing: {s}"))), + }; + + match url.scheme() { + "tcp" => Ok(Endpoint::Tcp(addr, port)), + "udp" => Ok(Endpoint::Udp(addr, port)), + _ => Err(Error::InvalidEndpoint(s.to_string())), + } + } else { + if url.path().is_empty() { + return Err(Error::InvalidEndpoint(s.to_string())); + } + + match url.scheme() { + "unix" => Ok(Endpoint::Unix(url.path().to_string())), + _ => Err(Error::InvalidEndpoint(s.to_string())), + } + } + } +} + +impl Endpoint { + /// Creates a new TCP endpoint from a `SocketAddr`. + pub fn new_tcp_addr(addr: &SocketAddr) -> Endpoint { + Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new UDP endpoint from a `SocketAddr`. + pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint { + Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new Unix endpoint from a `UnixSocketAddress`. + pub fn new_unix_addr(addr: &UnixSocketAddress) -> Endpoint { + Endpoint::Unix( + addr.as_pathname() + .and_then(|a| a.to_str()) + .unwrap_or("") + .to_string(), + ) + } + + /// Returns the `Port` of the endpoint. + pub fn port(&self) -> Result<&Port> { + match self { + Endpoint::Tcp(_, port) => Ok(port), + Endpoint::Udp(_, port) => Ok(port), + _ => Err(Error::TryFromEndpointError), + } + } + + /// Returns the `Addr` of the endpoint. + pub fn addr(&self) -> Result<&Addr> { + match self { + Endpoint::Tcp(addr, _) => Ok(addr), + Endpoint::Udp(addr, _) => Ok(addr), + _ => Err(Error::TryFromEndpointError), + } + } +} + +/// Addr defines a type for an address, either IP or domain. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] +pub enum Addr { + Ip(IpAddr), + Domain(String), +} + +impl TryFrom<Addr> for IpAddr { + type Error = Error; + fn try_from(addr: Addr) -> std::result::Result<IpAddr, Self::Error> { + match addr { + Addr::Ip(ip) => Ok(ip), + Addr::Domain(d) => Err(Error::InvalidAddress(d)), + } + } +} + +impl std::fmt::Display for Addr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Addr::Ip(ip) => { + write!(f, "{}", ip) + } + Addr::Domain(d) => { + write!(f, "{}", d) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + + #[test] + fn test_endpoint_from_str() { + let endpoint_str: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); + let endpoint = Endpoint::Tcp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 3000); + assert_eq!(endpoint_str, endpoint); + + let endpoint_str: Endpoint = "udp://127.0.0.1:4000".parse().unwrap(); + let endpoint = Endpoint::Udp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 4000); + assert_eq!(endpoint_str, endpoint); + + let endpoint_str: Endpoint = "tcp://example.com:3000".parse().unwrap(); + let endpoint = Endpoint::Tcp(Addr::Domain("example.com".to_string()), 3000); + assert_eq!(endpoint_str, endpoint); + + let endpoint_str = "unix:/home/x/s.socket".parse::<Endpoint>().unwrap(); + let endpoint = Endpoint::Unix("/home/x/s.socket".to_string()); + assert_eq!(endpoint_str, endpoint); + } +} diff --git a/karyons_net/src/error.rs b/karyons_net/src/error.rs new file mode 100644 index 0000000..a1c85db --- /dev/null +++ b/karyons_net/src/error.rs @@ -0,0 +1,45 @@ +use thiserror::Error as ThisError; + +pub type Result<T> = std::result::Result<T, Error>; + +#[derive(ThisError, Debug)] +pub enum Error { + #[error("IO Error: {0}")] + IO(#[from] std::io::Error), + + #[error("Try from endpoint Error")] + TryFromEndpointError, + + #[error("invalid address {0}")] + InvalidAddress(String), + + #[error("invalid endpoint {0}")] + InvalidEndpoint(String), + + #[error("Parse endpoint error {0}")] + ParseEndpoint(String), + + #[error("Timeout Error")] + Timeout, + + #[error("Channel Send Error: {0}")] + ChannelSend(String), + + #[error("Channel Receive Error: {0}")] + ChannelRecv(String), + + #[error("Karyons core error : {0}")] + KaryonsCore(#[from] karyons_core::error::Error), +} + +impl<T> From<smol::channel::SendError<T>> for Error { + fn from(error: smol::channel::SendError<T>) -> Self { + Error::ChannelSend(error.to_string()) + } +} + +impl From<smol::channel::RecvError> for Error { + fn from(error: smol::channel::RecvError) -> Self { + Error::ChannelRecv(error.to_string()) + } +} diff --git a/karyons_net/src/lib.rs b/karyons_net/src/lib.rs new file mode 100644 index 0000000..914c6d8 --- /dev/null +++ b/karyons_net/src/lib.rs @@ -0,0 +1,24 @@ +mod connection; +mod endpoint; +mod error; +mod listener; +mod transports; + +pub use { + connection::{dial, Conn, Connection}, + endpoint::{Addr, Endpoint, Port}, + listener::{listen, Listener}, + transports::{ + tcp::{dial_tcp, listen_tcp, TcpConn}, + udp::{dial_udp, listen_udp, UdpConn}, + unix::{dial_unix, listen_unix, UnixConn}, + }, +}; + +use error::{Error, Result}; + +/// Represents Karyons's Net Error +pub use error::Error as NetError; + +/// Represents Karyons's Net Result +pub use error::Result as NetResult; diff --git a/karyons_net/src/listener.rs b/karyons_net/src/listener.rs new file mode 100644 index 0000000..31a63ae --- /dev/null +++ b/karyons_net/src/listener.rs @@ -0,0 +1,39 @@ +use crate::{Endpoint, Error, Result}; +use async_trait::async_trait; + +use crate::{ + transports::{tcp, unix}, + Conn, +}; + +/// Listener is a generic network listener. +#[async_trait] +pub trait Listener: Send + Sync { + fn local_endpoint(&self) -> Result<Endpoint>; + async fn accept(&self) -> Result<Conn>; +} + +/// Listens to the provided endpoint. +/// +/// it only supports `tcp4/6` and `unix`. +/// +/// #Example +/// +/// ``` +/// use karyons_net::{Endpoint, listen}; +/// +/// async { +/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); +/// +/// let listener = listen(&endpoint).await.unwrap(); +/// let conn = listener.accept().await.unwrap(); +/// }; +/// +/// ``` +pub async fn listen(endpoint: &Endpoint) -> Result<Box<dyn Listener>> { + match endpoint { + Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::listen_tcp(addr, port).await?)), + Endpoint::Unix(addr) => Ok(Box::new(unix::listen_unix(addr)?)), + _ => Err(Error::InvalidEndpoint(endpoint.to_string())), + } +} diff --git a/karyons_net/src/transports/mod.rs b/karyons_net/src/transports/mod.rs new file mode 100644 index 0000000..f399133 --- /dev/null +++ b/karyons_net/src/transports/mod.rs @@ -0,0 +1,3 @@ +pub mod tcp; +pub mod udp; +pub mod unix; diff --git a/karyons_net/src/transports/tcp.rs b/karyons_net/src/transports/tcp.rs new file mode 100644 index 0000000..5ff7b28 --- /dev/null +++ b/karyons_net/src/transports/tcp.rs @@ -0,0 +1,82 @@ +use async_trait::async_trait; + +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::{TcpListener, TcpStream}, +}; + +use crate::{ + connection::Connection, + endpoint::{Addr, Endpoint, Port}, + listener::Listener, + Result, +}; + +/// TCP network connection implementations of the `Connection` trait. +pub struct TcpConn { + inner: TcpStream, + read: Mutex<ReadHalf<TcpStream>>, + write: Mutex<WriteHalf<TcpStream>>, +} + +impl TcpConn { + /// Creates a new TcpConn + pub fn new(conn: TcpStream) -> Self { + let (read, write) = split(conn.clone()); + Self { + inner: conn, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for TcpConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + self.read.lock().await.read_exact(buf).await?; + Ok(buf.len()) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + self.write.lock().await.write_all(buf).await?; + Ok(buf.len()) + } +} + +#[async_trait] +impl Listener for TcpListener { + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_tcp_addr(&self.local_addr()?)) + } + + async fn accept(&self) -> Result<Box<dyn Connection>> { + let (conn, _) = self.accept().await?; + conn.set_nodelay(true)?; + Ok(Box::new(TcpConn::new(conn))) + } +} + +/// Connects to the given TCP address and port. +pub async fn dial_tcp(addr: &Addr, port: &Port) -> Result<TcpConn> { + let address = format!("{}:{}", addr, port); + let conn = TcpStream::connect(address).await?; + conn.set_nodelay(true)?; + Ok(TcpConn::new(conn)) +} + +/// Listens on the given TCP address and port. +pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result<TcpListener> { + let address = format!("{}:{}", addr, port); + let listener = TcpListener::bind(address).await?; + Ok(listener) +} diff --git a/karyons_net/src/transports/udp.rs b/karyons_net/src/transports/udp.rs new file mode 100644 index 0000000..27fb9ae --- /dev/null +++ b/karyons_net/src/transports/udp.rs @@ -0,0 +1,77 @@ +use std::net::SocketAddr; + +use async_trait::async_trait; +use smol::net::UdpSocket; + +use crate::{ + connection::Connection, + endpoint::{Addr, Endpoint, Port}, + Result, +}; + +/// UDP network connection implementations of the `Connection` trait. +pub struct UdpConn { + inner: UdpSocket, +} + +impl UdpConn { + /// Creates a new UdpConn + pub fn new(conn: UdpSocket) -> Self { + Self { inner: conn } + } +} + +impl UdpConn { + /// Receives a single datagram message. Returns the number of bytes read + /// and the origin endpoint. + pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { + let (size, addr) = self.inner.recv_from(buf).await?; + Ok((size, Endpoint::new_udp_addr(&addr))) + } + + /// Sends data to the given address. Returns the number of bytes written. + pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> { + let addr: SocketAddr = addr.clone().try_into()?; + let size = self.inner.send_to(buf, addr).await?; + Ok(size) + } +} + +#[async_trait] +impl Connection for UdpConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + let size = self.inner.recv(buf).await?; + Ok(size) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + let size = self.inner.send(buf).await?; + Ok(size) + } +} + +/// Connects to the given UDP address and port. +pub async fn dial_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { + let address = format!("{}:{}", addr, port); + + // Let the operating system assign an available port to this socket + let conn = UdpSocket::bind("[::]:0").await?; + conn.connect(address).await?; + Ok(UdpConn::new(conn)) +} + +/// Listens on the given UDP address and port. +pub async fn listen_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { + let address = format!("{}:{}", addr, port); + let conn = UdpSocket::bind(address).await?; + let udp_conn = UdpConn::new(conn); + Ok(udp_conn) +} diff --git a/karyons_net/src/transports/unix.rs b/karyons_net/src/transports/unix.rs new file mode 100644 index 0000000..c89832e --- /dev/null +++ b/karyons_net/src/transports/unix.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; + +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::unix::{UnixListener, UnixStream}, +}; + +use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Result}; + +/// Unix domain socket implementations of the `Connection` trait. +pub struct UnixConn { + inner: UnixStream, + read: Mutex<ReadHalf<UnixStream>>, + write: Mutex<WriteHalf<UnixStream>>, +} + +impl UnixConn { + /// Creates a new UnixConn + pub fn new(conn: UnixStream) -> Self { + let (read, write) = split(conn.clone()); + Self { + inner: conn, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for UnixConn { + fn peer_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) + } + + async fn recv(&self, buf: &mut [u8]) -> Result<usize> { + self.read.lock().await.read_exact(buf).await?; + Ok(buf.len()) + } + + async fn send(&self, buf: &[u8]) -> Result<usize> { + self.write.lock().await.write_all(buf).await?; + Ok(buf.len()) + } +} + +#[async_trait] +impl Listener for UnixListener { + fn local_endpoint(&self) -> Result<Endpoint> { + Ok(Endpoint::new_unix_addr(&self.local_addr()?)) + } + + async fn accept(&self) -> Result<Box<dyn Connection>> { + let (conn, _) = self.accept().await?; + Ok(Box::new(UnixConn::new(conn))) + } +} + +/// Connects to the given Unix socket path. +pub async fn dial_unix(path: &String) -> Result<UnixConn> { + let conn = UnixStream::connect(path).await?; + Ok(UnixConn::new(conn)) +} + +/// Listens on the given Unix socket path. +pub fn listen_unix(path: &String) -> Result<UnixListener> { + let listener = UnixListener::bind(path)?; + Ok(listener) +} |