aboutsummaryrefslogtreecommitdiff
path: root/net/src
diff options
context:
space:
mode:
Diffstat (limited to 'net/src')
-rw-r--r--net/src/connection.rs57
-rw-r--r--net/src/endpoint.rs223
-rw-r--r--net/src/error.rs45
-rw-r--r--net/src/lib.rs24
-rw-r--r--net/src/listener.rs39
-rw-r--r--net/src/transports/mod.rs3
-rw-r--r--net/src/transports/tcp.rs82
-rw-r--r--net/src/transports/udp.rs77
-rw-r--r--net/src/transports/unix.rs73
9 files changed, 623 insertions, 0 deletions
diff --git a/net/src/connection.rs b/net/src/connection.rs
new file mode 100644
index 0000000..518ccfd
--- /dev/null
+++ b/net/src/connection.rs
@@ -0,0 +1,57 @@
+use crate::{Endpoint, Result};
+use async_trait::async_trait;
+
+use crate::transports::{tcp, udp, unix};
+
+/// Alias for `Box<dyn Connection>`
+pub type Conn = Box<dyn Connection>;
+
+/// Connection is a generic network connection interface for
+/// `UdpConn`, `TcpConn`, and `UnixConn`.
+///
+/// If you are familiar with the Go language, this is similar to the `Conn`
+/// interface <https://pkg.go.dev/net#Conn>
+#[async_trait]
+pub trait Connection: Send + Sync {
+ /// Returns the remote peer endpoint of this connection
+ fn peer_endpoint(&self) -> Result<Endpoint>;
+
+ /// Returns the local socket endpoint of this connection
+ fn local_endpoint(&self) -> Result<Endpoint>;
+
+ /// Reads data from this connection.
+ async fn recv(&self, buf: &mut [u8]) -> Result<usize>;
+
+ /// Sends data to this connection
+ async fn send(&self, buf: &[u8]) -> Result<usize>;
+}
+
+/// Connects to the provided endpoint.
+///
+/// it only supports `tcp4/6`, `udp4/6` and `unix`.
+///
+/// #Example
+///
+/// ```
+/// use karyons_net::{Endpoint, dial};
+///
+/// async {
+/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap();
+///
+/// let conn = dial(&endpoint).await.unwrap();
+///
+/// conn.send(b"MSG").await.unwrap();
+///
+/// let mut buffer = [0;32];
+/// conn.recv(&mut buffer).await.unwrap();
+/// };
+///
+/// ```
+///
+pub async fn dial(endpoint: &Endpoint) -> Result<Conn> {
+ match endpoint {
+ Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::dial_tcp(addr, port).await?)),
+ Endpoint::Udp(addr, port) => Ok(Box::new(udp::dial_udp(addr, port).await?)),
+ Endpoint::Unix(addr) => Ok(Box::new(unix::dial_unix(addr).await?)),
+ }
+}
diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs
new file mode 100644
index 0000000..50dfe6b
--- /dev/null
+++ b/net/src/endpoint.rs
@@ -0,0 +1,223 @@
+use std::{
+ net::{IpAddr, SocketAddr},
+ os::unix::net::SocketAddr as UnixSocketAddress,
+ path::PathBuf,
+ str::FromStr,
+};
+
+use bincode::{Decode, Encode};
+use url::Url;
+
+use crate::{Error, Result};
+
+/// Port defined as a u16.
+pub type Port = u16;
+
+/// Endpoint defines generic network endpoints for karyons.
+///
+/// # Example
+///
+/// ```
+/// use std::net::SocketAddr;
+///
+/// use karyons_net::Endpoint;
+///
+/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap();
+///
+/// let socketaddr: SocketAddr = "127.0.0.1:3000".parse().unwrap();
+/// let endpoint = Endpoint::new_udp_addr(&socketaddr);
+///
+/// ```
+///
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum Endpoint {
+ Udp(Addr, Port),
+ Tcp(Addr, Port),
+ Unix(String),
+}
+
+impl std::fmt::Display for Endpoint {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match self {
+ Endpoint::Udp(ip, port) => {
+ write!(f, "udp://{}:{}", ip, port)
+ }
+ Endpoint::Tcp(ip, port) => {
+ write!(f, "tcp://{}:{}", ip, port)
+ }
+ Endpoint::Unix(path) => {
+ if path.is_empty() {
+ write!(f, "unix:/UNNAMED")
+ } else {
+ write!(f, "unix:/{}", path)
+ }
+ }
+ }
+ }
+}
+
+impl TryFrom<Endpoint> for SocketAddr {
+ type Error = Error;
+ fn try_from(endpoint: Endpoint) -> std::result::Result<SocketAddr, Self::Error> {
+ match endpoint {
+ Endpoint::Udp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)),
+ Endpoint::Tcp(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)),
+ Endpoint::Unix(_) => Err(Error::TryFromEndpointError),
+ }
+ }
+}
+
+impl TryFrom<Endpoint> for PathBuf {
+ type Error = Error;
+ fn try_from(endpoint: Endpoint) -> std::result::Result<PathBuf, Self::Error> {
+ match endpoint {
+ Endpoint::Unix(path) => Ok(PathBuf::from(&path)),
+ _ => Err(Error::TryFromEndpointError),
+ }
+ }
+}
+
+impl TryFrom<Endpoint> for UnixSocketAddress {
+ type Error = Error;
+ fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> {
+ match endpoint {
+ Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?),
+ _ => Err(Error::TryFromEndpointError),
+ }
+ }
+}
+
+impl FromStr for Endpoint {
+ type Err = Error;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ let url: Url = match s.parse() {
+ Ok(u) => u,
+ Err(err) => return Err(Error::ParseEndpoint(err.to_string())),
+ };
+
+ if url.has_host() {
+ let host = url.host_str().unwrap();
+
+ let addr = match host.parse::<IpAddr>() {
+ Ok(addr) => Addr::Ip(addr),
+ Err(_) => Addr::Domain(host.to_string()),
+ };
+
+ let port = match url.port() {
+ Some(p) => p,
+ None => return Err(Error::ParseEndpoint(format!("port missing: {s}"))),
+ };
+
+ match url.scheme() {
+ "tcp" => Ok(Endpoint::Tcp(addr, port)),
+ "udp" => Ok(Endpoint::Udp(addr, port)),
+ _ => Err(Error::InvalidEndpoint(s.to_string())),
+ }
+ } else {
+ if url.path().is_empty() {
+ return Err(Error::InvalidEndpoint(s.to_string()));
+ }
+
+ match url.scheme() {
+ "unix" => Ok(Endpoint::Unix(url.path().to_string())),
+ _ => Err(Error::InvalidEndpoint(s.to_string())),
+ }
+ }
+ }
+}
+
+impl Endpoint {
+ /// Creates a new TCP endpoint from a `SocketAddr`.
+ pub fn new_tcp_addr(addr: &SocketAddr) -> Endpoint {
+ Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new UDP endpoint from a `SocketAddr`.
+ pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint {
+ Endpoint::Udp(Addr::Ip(addr.ip()), addr.port())
+ }
+
+ /// Creates a new Unix endpoint from a `UnixSocketAddress`.
+ pub fn new_unix_addr(addr: &UnixSocketAddress) -> Endpoint {
+ Endpoint::Unix(
+ addr.as_pathname()
+ .and_then(|a| a.to_str())
+ .unwrap_or("")
+ .to_string(),
+ )
+ }
+
+ /// Returns the `Port` of the endpoint.
+ pub fn port(&self) -> Result<&Port> {
+ match self {
+ Endpoint::Tcp(_, port) => Ok(port),
+ Endpoint::Udp(_, port) => Ok(port),
+ _ => Err(Error::TryFromEndpointError),
+ }
+ }
+
+ /// Returns the `Addr` of the endpoint.
+ pub fn addr(&self) -> Result<&Addr> {
+ match self {
+ Endpoint::Tcp(addr, _) => Ok(addr),
+ Endpoint::Udp(addr, _) => Ok(addr),
+ _ => Err(Error::TryFromEndpointError),
+ }
+ }
+}
+
+/// Addr defines a type for an address, either IP or domain.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
+pub enum Addr {
+ Ip(IpAddr),
+ Domain(String),
+}
+
+impl TryFrom<Addr> for IpAddr {
+ type Error = Error;
+ fn try_from(addr: Addr) -> std::result::Result<IpAddr, Self::Error> {
+ match addr {
+ Addr::Ip(ip) => Ok(ip),
+ Addr::Domain(d) => Err(Error::InvalidAddress(d)),
+ }
+ }
+}
+
+impl std::fmt::Display for Addr {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match self {
+ Addr::Ip(ip) => {
+ write!(f, "{}", ip)
+ }
+ Addr::Domain(d) => {
+ write!(f, "{}", d)
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::Ipv4Addr;
+
+ #[test]
+ fn test_endpoint_from_str() {
+ let endpoint_str: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap();
+ let endpoint = Endpoint::Tcp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 3000);
+ assert_eq!(endpoint_str, endpoint);
+
+ let endpoint_str: Endpoint = "udp://127.0.0.1:4000".parse().unwrap();
+ let endpoint = Endpoint::Udp(Addr::Ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), 4000);
+ assert_eq!(endpoint_str, endpoint);
+
+ let endpoint_str: Endpoint = "tcp://example.com:3000".parse().unwrap();
+ let endpoint = Endpoint::Tcp(Addr::Domain("example.com".to_string()), 3000);
+ assert_eq!(endpoint_str, endpoint);
+
+ let endpoint_str = "unix:/home/x/s.socket".parse::<Endpoint>().unwrap();
+ let endpoint = Endpoint::Unix("/home/x/s.socket".to_string());
+ assert_eq!(endpoint_str, endpoint);
+ }
+}
diff --git a/net/src/error.rs b/net/src/error.rs
new file mode 100644
index 0000000..a1c85db
--- /dev/null
+++ b/net/src/error.rs
@@ -0,0 +1,45 @@
+use thiserror::Error as ThisError;
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(ThisError, Debug)]
+pub enum Error {
+ #[error("IO Error: {0}")]
+ IO(#[from] std::io::Error),
+
+ #[error("Try from endpoint Error")]
+ TryFromEndpointError,
+
+ #[error("invalid address {0}")]
+ InvalidAddress(String),
+
+ #[error("invalid endpoint {0}")]
+ InvalidEndpoint(String),
+
+ #[error("Parse endpoint error {0}")]
+ ParseEndpoint(String),
+
+ #[error("Timeout Error")]
+ Timeout,
+
+ #[error("Channel Send Error: {0}")]
+ ChannelSend(String),
+
+ #[error("Channel Receive Error: {0}")]
+ ChannelRecv(String),
+
+ #[error("Karyons core error : {0}")]
+ KaryonsCore(#[from] karyons_core::error::Error),
+}
+
+impl<T> From<smol::channel::SendError<T>> for Error {
+ fn from(error: smol::channel::SendError<T>) -> Self {
+ Error::ChannelSend(error.to_string())
+ }
+}
+
+impl From<smol::channel::RecvError> for Error {
+ fn from(error: smol::channel::RecvError) -> Self {
+ Error::ChannelRecv(error.to_string())
+ }
+}
diff --git a/net/src/lib.rs b/net/src/lib.rs
new file mode 100644
index 0000000..914c6d8
--- /dev/null
+++ b/net/src/lib.rs
@@ -0,0 +1,24 @@
+mod connection;
+mod endpoint;
+mod error;
+mod listener;
+mod transports;
+
+pub use {
+ connection::{dial, Conn, Connection},
+ endpoint::{Addr, Endpoint, Port},
+ listener::{listen, Listener},
+ transports::{
+ tcp::{dial_tcp, listen_tcp, TcpConn},
+ udp::{dial_udp, listen_udp, UdpConn},
+ unix::{dial_unix, listen_unix, UnixConn},
+ },
+};
+
+use error::{Error, Result};
+
+/// Represents Karyons's Net Error
+pub use error::Error as NetError;
+
+/// Represents Karyons's Net Result
+pub use error::Result as NetResult;
diff --git a/net/src/listener.rs b/net/src/listener.rs
new file mode 100644
index 0000000..31a63ae
--- /dev/null
+++ b/net/src/listener.rs
@@ -0,0 +1,39 @@
+use crate::{Endpoint, Error, Result};
+use async_trait::async_trait;
+
+use crate::{
+ transports::{tcp, unix},
+ Conn,
+};
+
+/// Listener is a generic network listener.
+#[async_trait]
+pub trait Listener: Send + Sync {
+ fn local_endpoint(&self) -> Result<Endpoint>;
+ async fn accept(&self) -> Result<Conn>;
+}
+
+/// Listens to the provided endpoint.
+///
+/// it only supports `tcp4/6` and `unix`.
+///
+/// #Example
+///
+/// ```
+/// use karyons_net::{Endpoint, listen};
+///
+/// async {
+/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap();
+///
+/// let listener = listen(&endpoint).await.unwrap();
+/// let conn = listener.accept().await.unwrap();
+/// };
+///
+/// ```
+pub async fn listen(endpoint: &Endpoint) -> Result<Box<dyn Listener>> {
+ match endpoint {
+ Endpoint::Tcp(addr, port) => Ok(Box::new(tcp::listen_tcp(addr, port).await?)),
+ Endpoint::Unix(addr) => Ok(Box::new(unix::listen_unix(addr)?)),
+ _ => Err(Error::InvalidEndpoint(endpoint.to_string())),
+ }
+}
diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs
new file mode 100644
index 0000000..f399133
--- /dev/null
+++ b/net/src/transports/mod.rs
@@ -0,0 +1,3 @@
+pub mod tcp;
+pub mod udp;
+pub mod unix;
diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs
new file mode 100644
index 0000000..5ff7b28
--- /dev/null
+++ b/net/src/transports/tcp.rs
@@ -0,0 +1,82 @@
+use async_trait::async_trait;
+
+use smol::{
+ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
+ lock::Mutex,
+ net::{TcpListener, TcpStream},
+};
+
+use crate::{
+ connection::Connection,
+ endpoint::{Addr, Endpoint, Port},
+ listener::Listener,
+ Result,
+};
+
+/// TCP network connection implementations of the `Connection` trait.
+pub struct TcpConn {
+ inner: TcpStream,
+ read: Mutex<ReadHalf<TcpStream>>,
+ write: Mutex<WriteHalf<TcpStream>>,
+}
+
+impl TcpConn {
+ /// Creates a new TcpConn
+ pub fn new(conn: TcpStream) -> Self {
+ let (read, write) = split(conn.clone());
+ Self {
+ inner: conn,
+ read: Mutex::new(read),
+ write: Mutex::new(write),
+ }
+ }
+}
+
+#[async_trait]
+impl Connection for TcpConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?))
+ }
+
+ async fn recv(&self, buf: &mut [u8]) -> Result<usize> {
+ self.read.lock().await.read_exact(buf).await?;
+ Ok(buf.len())
+ }
+
+ async fn send(&self, buf: &[u8]) -> Result<usize> {
+ self.write.lock().await.write_all(buf).await?;
+ Ok(buf.len())
+ }
+}
+
+#[async_trait]
+impl Listener for TcpListener {
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_tcp_addr(&self.local_addr()?))
+ }
+
+ async fn accept(&self) -> Result<Box<dyn Connection>> {
+ let (conn, _) = self.accept().await?;
+ conn.set_nodelay(true)?;
+ Ok(Box::new(TcpConn::new(conn)))
+ }
+}
+
+/// Connects to the given TCP address and port.
+pub async fn dial_tcp(addr: &Addr, port: &Port) -> Result<TcpConn> {
+ let address = format!("{}:{}", addr, port);
+ let conn = TcpStream::connect(address).await?;
+ conn.set_nodelay(true)?;
+ Ok(TcpConn::new(conn))
+}
+
+/// Listens on the given TCP address and port.
+pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result<TcpListener> {
+ let address = format!("{}:{}", addr, port);
+ let listener = TcpListener::bind(address).await?;
+ Ok(listener)
+}
diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs
new file mode 100644
index 0000000..27fb9ae
--- /dev/null
+++ b/net/src/transports/udp.rs
@@ -0,0 +1,77 @@
+use std::net::SocketAddr;
+
+use async_trait::async_trait;
+use smol::net::UdpSocket;
+
+use crate::{
+ connection::Connection,
+ endpoint::{Addr, Endpoint, Port},
+ Result,
+};
+
+/// UDP network connection implementations of the `Connection` trait.
+pub struct UdpConn {
+ inner: UdpSocket,
+}
+
+impl UdpConn {
+ /// Creates a new UdpConn
+ pub fn new(conn: UdpSocket) -> Self {
+ Self { inner: conn }
+ }
+}
+
+impl UdpConn {
+ /// Receives a single datagram message. Returns the number of bytes read
+ /// and the origin endpoint.
+ pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> {
+ let (size, addr) = self.inner.recv_from(buf).await?;
+ Ok((size, Endpoint::new_udp_addr(&addr)))
+ }
+
+ /// Sends data to the given address. Returns the number of bytes written.
+ pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> {
+ let addr: SocketAddr = addr.clone().try_into()?;
+ let size = self.inner.send_to(buf, addr).await?;
+ Ok(size)
+ }
+}
+
+#[async_trait]
+impl Connection for UdpConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?))
+ }
+
+ async fn recv(&self, buf: &mut [u8]) -> Result<usize> {
+ let size = self.inner.recv(buf).await?;
+ Ok(size)
+ }
+
+ async fn send(&self, buf: &[u8]) -> Result<usize> {
+ let size = self.inner.send(buf).await?;
+ Ok(size)
+ }
+}
+
+/// Connects to the given UDP address and port.
+pub async fn dial_udp(addr: &Addr, port: &Port) -> Result<UdpConn> {
+ let address = format!("{}:{}", addr, port);
+
+ // Let the operating system assign an available port to this socket
+ let conn = UdpSocket::bind("[::]:0").await?;
+ conn.connect(address).await?;
+ Ok(UdpConn::new(conn))
+}
+
+/// Listens on the given UDP address and port.
+pub async fn listen_udp(addr: &Addr, port: &Port) -> Result<UdpConn> {
+ let address = format!("{}:{}", addr, port);
+ let conn = UdpSocket::bind(address).await?;
+ let udp_conn = UdpConn::new(conn);
+ Ok(udp_conn)
+}
diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs
new file mode 100644
index 0000000..c89832e
--- /dev/null
+++ b/net/src/transports/unix.rs
@@ -0,0 +1,73 @@
+use async_trait::async_trait;
+
+use smol::{
+ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
+ lock::Mutex,
+ net::unix::{UnixListener, UnixStream},
+};
+
+use crate::{connection::Connection, endpoint::Endpoint, listener::Listener, Result};
+
+/// Unix domain socket implementations of the `Connection` trait.
+pub struct UnixConn {
+ inner: UnixStream,
+ read: Mutex<ReadHalf<UnixStream>>,
+ write: Mutex<WriteHalf<UnixStream>>,
+}
+
+impl UnixConn {
+ /// Creates a new UnixConn
+ pub fn new(conn: UnixStream) -> Self {
+ let (read, write) = split(conn.clone());
+ Self {
+ inner: conn,
+ read: Mutex::new(read),
+ write: Mutex::new(write),
+ }
+ }
+}
+
+#[async_trait]
+impl Connection for UnixConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?))
+ }
+
+ async fn recv(&self, buf: &mut [u8]) -> Result<usize> {
+ self.read.lock().await.read_exact(buf).await?;
+ Ok(buf.len())
+ }
+
+ async fn send(&self, buf: &[u8]) -> Result<usize> {
+ self.write.lock().await.write_all(buf).await?;
+ Ok(buf.len())
+ }
+}
+
+#[async_trait]
+impl Listener for UnixListener {
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_unix_addr(&self.local_addr()?))
+ }
+
+ async fn accept(&self) -> Result<Box<dyn Connection>> {
+ let (conn, _) = self.accept().await?;
+ Ok(Box::new(UnixConn::new(conn)))
+ }
+}
+
+/// Connects to the given Unix socket path.
+pub async fn dial_unix(path: &String) -> Result<UnixConn> {
+ let conn = UnixStream::connect(path).await?;
+ Ok(UnixConn::new(conn))
+}
+
+/// Listens on the given Unix socket path.
+pub fn listen_unix(path: &String) -> Result<UnixListener> {
+ let listener = UnixListener::bind(path)?;
+ Ok(listener)
+}