diff options
Diffstat (limited to 'karyons_net/src')
-rw-r--r-- | karyons_net/src/connection.rs | 57 | ||||
-rw-r--r-- | karyons_net/src/endpoint.rs | 223 | ||||
-rw-r--r-- | karyons_net/src/error.rs | 45 | ||||
-rw-r--r-- | karyons_net/src/lib.rs | 24 | ||||
-rw-r--r-- | karyons_net/src/listener.rs | 39 | ||||
-rw-r--r-- | karyons_net/src/transports/mod.rs | 3 | ||||
-rw-r--r-- | karyons_net/src/transports/tcp.rs | 82 | ||||
-rw-r--r-- | karyons_net/src/transports/udp.rs | 77 | ||||
-rw-r--r-- | karyons_net/src/transports/unix.rs | 73 |
9 files changed, 0 insertions, 623 deletions
diff --git a/karyons_net/src/connection.rs b/karyons_net/src/connection.rs deleted file mode 100644 index 518ccfd..0000000 --- a/karyons_net/src/connection.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::{Endpoint, Result}; -use async_trait::async_trait; - -use crate::transports::{tcp, udp, unix}; - -/// Alias for `Box<dyn Connection>` -pub type Conn = Box<dyn Connection>; - -/// Connection is a generic network connection interface for -/// `UdpConn`, `TcpConn`, and `UnixConn`. -/// -/// If you are familiar with the Go language, this is similar to the `Conn` -/// interface <https://pkg.go.dev/net#Conn> -#[async_trait] -pub trait Connection: Send + Sync { - /// Returns the remote peer endpoint of this connection - fn peer_endpoint(&self) -> Result<Endpoint>; - - /// Returns the local socket endpoint of this connection - fn local_endpoint(&self) -> Result<Endpoint>; - - /// Reads data from this connection. - async fn recv(&self, buf: &mut [u8]) -> Result<usize>; - - /// Sends data to this connection - async fn send(&self, buf: &[u8]) -> Result<usize>; -} - -/// Connects to the provided endpoint. -/// -/// it only supports `tcp4/6`, `udp4/6` and `unix`. -/// -/// #Example -/// -/// ``` -/// use karyons_net::{Endpoint, dial}; -/// -/// async { -/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); -/// -/// let conn = dial(&endpoint).await.unwrap(); -/// -/// conn.send(b"MSG").await.unwrap(); -/// -/// let mut buffer = [0;32]; -/// conn.recv(&mut buffer).await.unwrap(); -/// }; -/// -/// ``` -/// -pub async fn dial(endpoint: &Endpoint) -> Result<Conn> { - match endpoint { - Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::dial_tcp(addr, port).await?)), - Endpoint::Udp(addr, port) => Ok(Box::new(udp::dial_udp(addr, port).await?)), - Endpoint::Unix(addr) => Ok(Box::new(unix::dial_unix(addr).await?)), - } -} diff --git a/karyons_net/src/endpoint.rs b/karyons_net/src/endpoint.rs deleted file mode 100644 index 50dfe6b..0000000 --- a/karyons_net/src/endpoint.rs +++ /dev/null @@ -1,223 +0,0 @@ -use std::{ - net::{IpAddr, SocketAddr}, - os::unix::net::SocketAddr as UnixSocketAddress, - path::PathBuf, - str::FromStr, -}; - -use bincode::{Decode, Encode}; -use url::Url; - -use crate::{Error, Result}; - -/// Port defined as a u16. -pub type Port = u16; - -/// Endpoint defines generic network endpoints for karyons. -/// -/// # Example -/// -/// ``` -/// use std::net::SocketAddr; -/// -/// use karyons_net::Endpoint; -/// -/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); -/// -/// let socketaddr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); -/// let endpoint = Endpoint::new_udp_addr(&socketaddr); -/// -/// ``` -/// -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum Endpoint { - Udp(Addr, Port), - Tcp(Addr, Port), - Unix(String), -} - -impl std::fmt::Display for Endpoint { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Endpoint::Udp(ip, port) => { - write!(f, "udp://{}:{}", ip, port) - } - Endpoint::Tcp(ip, port) => { - write!(f, "tcp://{}:{}", ip, port) - } - Endpoint::Unix(path) => { - if path.is_empty() { - write!(f, "unix:/UNNAMED") - } else { - write!(f, "unix:/{}", path) - } - } - } - } -} - -impl TryFrom<Endpoint> for SocketAddr { - type Error = Error; - fn try_from(endpoint: Endpoint) -> std::result::Result<SocketAddr, Self::Error> { - match endpoint { - Endpoint::Udp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), - Endpoint::Tcp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), - Endpoint::Unix(_) => Err(Error::TryFromEndpointError), - } - } -} - -impl TryFrom<Endpoint> for PathBuf { - type Error = Error; - fn try_from(endpoint: Endpoint) -> std::result::Result<PathBuf, Self::Error> { - match endpoint { - Endpoint::Unix(path) => Ok(PathBuf::from(&path)), - _ => Err(Error::TryFromEndpointError), - } - } -} - -impl TryFrom<Endpoint> for UnixSocketAddress { - type Error = Error; - fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> { - match endpoint { - Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?), - _ => Err(Error::TryFromEndpointError), - } - } -} - -impl FromStr for Endpoint { - type Err = Error; - - fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { - let url: Url = match s.parse() { - Ok(u) => u, - Err(err) => return Err(Error::ParseEndpoint(err.to_string())), - }; - - if url.has_host() { - let host = url.host_str().unwrap(); - - let addr = match host.parse::<IpAddr>() { - Ok(addr) => Addr::Ip(addr), - Err(_) => Addr::Domain(host.to_string()), - }; - - let port = match url.port() { - Some(p) => p, - None => return Err(Error::ParseEndpoint(format!("port missing: {s}"))), - }; - - match url.scheme() { - "tcp" => Ok(Endpoint::Tcp(addr, port)), - "udp" => Ok(Endpoint::Udp(addr, port)), - _ => Err(Error::InvalidEndpoint(s.to_string())), - } - } else { - if url.path().is_empty() { - return Err(Error::InvalidEndpoint(s.to_string())); - } - - match url.scheme() { - "unix" => Ok(Endpoint::Unix(url.path().to_string())), - _ => Err(Error::InvalidEndpoint(s.to_string())), - } - } - } -} - -impl Endpoint { - /// Creates a new TCP endpoint from a `SocketAddr`. - pub fn new_tcp_addr(addr: &SocketAddr) -> Endpoint { - Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new UDP endpoint from a `SocketAddr`. - pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint { - Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) - } - - /// Creates a new Unix endpoint from a `UnixSocketAddress`. - pub fn new_unix_addr(addr: &UnixSocketAddress) -> Endpoint { - Endpoint::Unix( - addr.as_pathname() - .and_then(|a| a.to_str()) - .unwrap_or("") - .to_string(), - ) - } - - /// Returns the `Port` of the endpoint. - pub fn port(&self) -> Result<&Port> { - match self { - Endpoint::Tcp(_, port) => Ok(port), - Endpoint::Udp(_, port) => Ok(port), - _ => Err(Error::TryFromEndpointError), - } - } - - /// Returns the `Addr` of the endpoint. - pub fn addr(&self) -> Result<&Addr> { - match self { - Endpoint::Tcp(addr, _) => Ok(addr), - Endpoint::Udp(addr, _) => Ok(addr), - _ => Err(Error::TryFromEndpointError), - } - } -} - -/// Addr defines a type for an address, either IP or domain. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] -pub enum Addr { - Ip(IpAddr), - Domain(String), -} - -impl TryFrom<Addr> for IpAddr { - type Error = Error; - fn try_from(addr: Addr) -> std::result::Result<IpAddr, Self::Error> { - match addr { - Addr::Ip(ip) => Ok(ip), - Addr::Domain(d) => Err(Error::InvalidAddress(d)), - } - } -} - -impl std::fmt::Display for Addr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Addr::Ip(ip) => { - write!(f, "{}", ip) - } - Addr::Domain(d) => { - write!(f, "{}", d) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::Ipv4Addr; - - #[test] - fn test_endpoint_from_str() { - let endpoint_str: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); - let endpoint = Endpoint::Tcp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 3000); - assert_eq!(endpoint_str, endpoint); - - let endpoint_str: Endpoint = "udp://127.0.0.1:4000".parse().unwrap(); - let endpoint = Endpoint::Udp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 4000); - assert_eq!(endpoint_str, endpoint); - - let endpoint_str: Endpoint = "tcp://example.com:3000".parse().unwrap(); - let endpoint = Endpoint::Tcp(Addr::Domain("example.com".to_string()), 3000); - assert_eq!(endpoint_str, endpoint); - - let endpoint_str = "unix:/home/x/s.socket".parse::<Endpoint>().unwrap(); - let endpoint = Endpoint::Unix("/home/x/s.socket".to_string()); - assert_eq!(endpoint_str, endpoint); - } -} diff --git a/karyons_net/src/error.rs b/karyons_net/src/error.rs deleted file mode 100644 index a1c85db..0000000 --- a/karyons_net/src/error.rs +++ /dev/null @@ -1,45 +0,0 @@ -use thiserror::Error as ThisError; - -pub type Result<T> = std::result::Result<T, Error>; - -#[derive(ThisError, Debug)] -pub enum Error { - #[error("IO Error: {0}")] - IO(#[from] std::io::Error), - - #[error("Try from endpoint Error")] - TryFromEndpointError, - - #[error("invalid address {0}")] - InvalidAddress(String), - - #[error("invalid endpoint {0}")] - InvalidEndpoint(String), - - #[error("Parse endpoint error {0}")] - ParseEndpoint(String), - - #[error("Timeout Error")] - Timeout, - - #[error("Channel Send Error: {0}")] - ChannelSend(String), - - #[error("Channel Receive Error: {0}")] - ChannelRecv(String), - - #[error("Karyons core error : {0}")] - KaryonsCore(#[from] karyons_core::error::Error), -} - -impl<T> From<smol::channel::SendError<T>> for Error { - fn from(error: smol::channel::SendError<T>) -> Self { - Error::ChannelSend(error.to_string()) - } -} - -impl From<smol::channel::RecvError> for Error { - fn from(error: smol::channel::RecvError) -> Self { - Error::ChannelRecv(error.to_string()) - } -} diff --git a/karyons_net/src/lib.rs b/karyons_net/src/lib.rs deleted file mode 100644 index 914c6d8..0000000 --- a/karyons_net/src/lib.rs +++ /dev/null @@ -1,24 +0,0 @@ -mod connection; -mod endpoint; -mod error; -mod listener; -mod transports; - -pub use { - connection::{dial, Conn, Connection}, - endpoint::{Addr, Endpoint, Port}, - listener::{listen, Listener}, - transports::{ - tcp::{dial_tcp, listen_tcp, TcpConn}, - udp::{dial_udp, listen_udp, UdpConn}, - unix::{dial_unix, listen_unix, UnixConn}, - }, -}; - -use error::{Error, Result}; - -/// Represents Karyons's Net Error -pub use error::Error as NetError; - -/// Represents Karyons's Net Result -pub use error::Result as NetResult; diff --git a/karyons_net/src/listener.rs b/karyons_net/src/listener.rs deleted file mode 100644 index 31a63ae..0000000 --- a/karyons_net/src/listener.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::{Endpoint, Error, Result}; -use async_trait::async_trait; - -use crate::{ - transports::{tcp, unix}, - Conn, -}; - -/// Listener is a generic network listener. -#[async_trait] -pub trait Listener: Send + Sync { - fn local_endpoint(&self) -> Result<Endpoint>; - async fn accept(&self) -> Result<Conn>; -} - -/// Listens to the provided endpoint. -/// -/// it only supports `tcp4/6` and `unix`. -/// -/// #Example -/// -/// ``` -/// use karyons_net::{Endpoint, listen}; -/// -/// async { -/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); -/// -/// let listener = listen(&endpoint).await.unwrap(); -/// let conn = listener.accept().await.unwrap(); -/// }; -/// -/// ``` -pub async fn listen(endpoint: &Endpoint) -> Result<Box<dyn Listener>> { - match endpoint { - Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::listen_tcp(addr, port).await?)), - Endpoint::Unix(addr) => Ok(Box::new(unix::listen_unix(addr)?)), - _ => Err(Error::InvalidEndpoint(endpoint.to_string())), - } -} diff --git a/karyons_net/src/transports/mod.rs b/karyons_net/src/transports/mod.rs deleted file mode 100644 index f399133..0000000 --- a/karyons_net/src/transports/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod tcp; -pub mod udp; -pub mod unix; diff --git a/karyons_net/src/transports/tcp.rs b/karyons_net/src/transports/tcp.rs deleted file mode 100644 index 5ff7b28..0000000 --- a/karyons_net/src/transports/tcp.rs +++ /dev/null @@ -1,82 +0,0 @@ -use async_trait::async_trait; - -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, - lock::Mutex, - net::{TcpListener, TcpStream}, -}; - -use crate::{ - connection::Connection, - endpoint::{Addr, Endpoint, Port}, - listener::Listener, - Result, -}; - -/// TCP network connection implementations of the `Connection` trait. -pub struct TcpConn { - inner: TcpStream, - read: Mutex<ReadHalf<TcpStream>>, - write: Mutex<WriteHalf<TcpStream>>, -} - -impl TcpConn { - /// Creates a new TcpConn - pub fn new(conn: TcpStream) -> Self { - let (read, write) = split(conn.clone()); - Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), - } - } -} - -#[async_trait] -impl Connection for TcpConn { - fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?)) - } - - fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?)) - } - - async fn recv(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read_exact(buf).await?; - Ok(buf.len()) - } - - async fn send(&self, buf: &[u8]) -> Result<usize> { - self.write.lock().await.write_all(buf).await?; - Ok(buf.len()) - } -} - -#[async_trait] -impl Listener for TcpListener { - fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.local_addr()?)) - } - - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (conn, _) = self.accept().await?; - conn.set_nodelay(true)?; - Ok(Box::new(TcpConn::new(conn))) - } -} - -/// Connects to the given TCP address and port. -pub async fn dial_tcp(addr: &Addr, port: &Port) -> Result<TcpConn> { - let address = format!("{}:{}", addr, port); - let conn = TcpStream::connect(address).await?; - conn.set_nodelay(true)?; - Ok(TcpConn::new(conn)) -} - -/// Listens on the given TCP address and port. -pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result<TcpListener> { - let address = format!("{}:{}", addr, port); - let listener = TcpListener::bind(address).await?; - Ok(listener) -} diff --git a/karyons_net/src/transports/udp.rs b/karyons_net/src/transports/udp.rs deleted file mode 100644 index 27fb9ae..0000000 --- a/karyons_net/src/transports/udp.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::net::SocketAddr; - -use async_trait::async_trait; -use smol::net::UdpSocket; - -use crate::{ - connection::Connection, - endpoint::{Addr, Endpoint, Port}, - Result, -}; - -/// UDP network connection implementations of the `Connection` trait. -pub struct UdpConn { - inner: UdpSocket, -} - -impl UdpConn { - /// Creates a new UdpConn - pub fn new(conn: UdpSocket) -> Self { - Self { inner: conn } - } -} - -impl UdpConn { - /// Receives a single datagram message. Returns the number of bytes read - /// and the origin endpoint. - pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { - let (size, addr) = self.inner.recv_from(buf).await?; - Ok((size, Endpoint::new_udp_addr(&addr))) - } - - /// Sends data to the given address. Returns the number of bytes written. - pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> { - let addr: SocketAddr = addr.clone().try_into()?; - let size = self.inner.send_to(buf, addr).await?; - Ok(size) - } -} - -#[async_trait] -impl Connection for UdpConn { - fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) - } - - fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) - } - - async fn recv(&self, buf: &mut [u8]) -> Result<usize> { - let size = self.inner.recv(buf).await?; - Ok(size) - } - - async fn send(&self, buf: &[u8]) -> Result<usize> { - let size = self.inner.send(buf).await?; - Ok(size) - } -} - -/// Connects to the given UDP address and port. -pub async fn dial_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { - let address = format!("{}:{}", addr, port); - - // Let the operating system assign an available port to this socket - let conn = UdpSocket::bind("[::]:0").await?; - conn.connect(address).await?; - Ok(UdpConn::new(conn)) -} - -/// Listens on the given UDP address and port. -pub async fn listen_udp(addr: &Addr, port: &Port) -> Result<UdpConn> { - let address = format!("{}:{}", addr, port); - let conn = UdpSocket::bind(address).await?; - let udp_conn = UdpConn::new(conn); - Ok(udp_conn) -} diff --git a/karyons_net/src/transports/unix.rs b/karyons_net/src/transports/unix.rs deleted file mode 100644 index c89832e..0000000 --- a/karyons_net/src/transports/unix.rs +++ /dev/null @@ -1,73 +0,0 @@ -use async_trait::async_trait; - -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, - lock::Mutex, - net::unix::{UnixListener, UnixStream}, -}; - -use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Result}; - -/// Unix domain socket implementations of the `Connection` trait. -pub struct UnixConn { - inner: UnixStream, - read: Mutex<ReadHalf<UnixStream>>, - write: Mutex<WriteHalf<UnixStream>>, -} - -impl UnixConn { - /// Creates a new UnixConn - pub fn new(conn: UnixStream) -> Self { - let (read, write) = split(conn.clone()); - Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), - } - } -} - -#[async_trait] -impl Connection for UnixConn { - fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) - } - - fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) - } - - async fn recv(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read_exact(buf).await?; - Ok(buf.len()) - } - - async fn send(&self, buf: &[u8]) -> Result<usize> { - self.write.lock().await.write_all(buf).await?; - Ok(buf.len()) - } -} - -#[async_trait] -impl Listener for UnixListener { - fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.local_addr()?)) - } - - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (conn, _) = self.accept().await?; - Ok(Box::new(UnixConn::new(conn))) - } -} - -/// Connects to the given Unix socket path. -pub async fn dial_unix(path: &String) -> Result<UnixConn> { - let conn = UnixStream::connect(path).await?; - Ok(UnixConn::new(conn)) -} - -/// Listens on the given Unix socket path. -pub fn listen_unix(path: &String) -> Result<UnixListener> { - let listener = UnixListener::bind(path)?; - Ok(listener) -} |