diff options
Diffstat (limited to 'net/src/transports/unix.rs')
-rw-r--r-- | net/src/transports/unix.rs | 193 |
1 files changed, 137 insertions, 56 deletions
diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index 494e104..bafebaf 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -1,111 +1,192 @@ use async_trait::async_trait; +use futures_util::SinkExt; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, - net::unix::{UnixListener, UnixStream}, + net::{UnixListener as AsyncUnixListener, UnixStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, Error, Result, }; +/// Unix Conn config +#[derive(Clone, Default)] +pub struct UnixConfig {} + /// Unix domain socket implementation of the [`Connection`] trait. -pub struct UnixConn { - inner: UnixStream, - read: Mutex<ReadHalf<UnixStream>>, - write: Mutex<WriteHalf<UnixStream>>, +pub struct UnixConn<C> { + read_stream: Mutex<ReadStream<ReadHalf<UnixStream>, C>>, + write_stream: Mutex<WriteStream<WriteHalf<UnixStream>, C>>, + peer_endpoint: Option<Endpoint>, + local_endpoint: Option<Endpoint>, } -impl UnixConn { - /// Creates a new UnixConn - pub fn new(conn: UnixStream) -> Self { - let (read, write) = split(conn.clone()); +impl<C> UnixConn<C> +where + C: Codec + Clone, +{ + /// Creates a new TcpConn + pub fn new(conn: UnixStream, codec: C) -> Self { + let peer_endpoint = conn + .peer_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + let local_endpoint = conn + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + + let (read, write) = split(conn); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for UnixConn { +impl<C> Connection for UnixConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) + self.peer_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) + self.local_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await + } +} + +#[allow(dead_code)] +pub struct UnixListener<C> { + inner: AsyncUnixListener, + config: UnixConfig, + codec: C, +} + +impl<C> UnixListener<C> +where + C: Codec + Clone, +{ + pub fn new(listener: AsyncUnixListener, config: UnixConfig, codec: C) -> Self { + Self { + inner: listener, + config, + codec, + } } } #[async_trait] -impl ConnListener for UnixListener { +impl<C> ConnListener for UnixListener<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.local_addr()?)) + self.inner + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .map_err(Error::from) } - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (conn, _) = self.accept().await?; - Ok(Box::new(UnixConn::new(conn))) + async fn accept(&self) -> Result<Conn<C::Item>> { + let (conn, _) = self.inner.accept().await?; + Ok(Box::new(UnixConn::new(conn, self.codec.clone()))) } } /// Connects to the given Unix socket path. -pub async fn dial(path: &String) -> Result<UnixConn> { +pub async fn dial<C>(endpoint: &Endpoint, _config: UnixConfig, codec: C) -> Result<UnixConn<C>> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; let conn = UnixStream::connect(path).await?; - Ok(UnixConn::new(conn)) + Ok(UnixConn::new(conn, codec)) } /// Listens on the given Unix socket path. -pub fn listen(path: &String) -> Result<UnixListener> { - let listener = UnixListener::bind(path)?; - Ok(listener) -} - -impl From<UnixStream> for Box<dyn Connection> { - fn from(conn: UnixStream) -> Self { - Box::new(UnixConn::new(conn)) - } +pub fn listen<C>(endpoint: &Endpoint, config: UnixConfig, codec: C) -> Result<UnixListener<C>> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; + let listener = AsyncUnixListener::bind(path)?; + Ok(UnixListener::new(listener, config, codec)) } -impl From<UnixListener> for Box<dyn ConnListener> { - fn from(listener: UnixListener) -> Self { +// impl From<UnixStream> for Box<dyn Connection> { +// fn from(conn: UnixStream) -> Self { +// Box::new(UnixConn::new(conn)) +// } +// } + +impl<C> From<UnixListener<C>> for Listener<C::Item> +where + C: Codec + Clone, +{ + fn from(listener: UnixListener<C>) -> Self { Box::new(listener) } } -impl ToConn for UnixStream { - fn to_conn(self) -> Box<dyn Connection> { - self.into() - } -} - -impl ToConn for UnixConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for UnixConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } -impl ToListener for UnixListener { - fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for UnixListener<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_listener(self) -> Listener<Self::Item> { self.into() } } |