use async_trait::async_trait; use futures_util::SinkExt; use karyon_core::async_runtime::{ io::{split, ReadHalf, WriteHalf}, lock::Mutex, net::{UnixListener as AsyncUnixListener, UnixStream}, }; use crate::{ codec::Codec, connection::{Conn, Connection, ToConn}, endpoint::Endpoint, listener::{ConnListener, Listener, ToListener}, stream::{ReadStream, WriteStream}, Error, Result, }; /// Unix Conn config #[derive(Clone, Default)] pub struct UnixConfig {} /// Unix domain socket implementation of the [`Connection`] trait. pub struct UnixConn { read_stream: Mutex, C>>, write_stream: Mutex, C>>, peer_endpoint: Option, local_endpoint: Option, } impl UnixConn where C: Codec + Clone, { /// Creates a new TcpConn pub fn new(conn: UnixStream, codec: C) -> Self { let peer_endpoint = conn .peer_addr() .and_then(|a| { Ok(Endpoint::new_unix_addr( a.as_pathname() .ok_or(std::io::ErrorKind::AddrNotAvailable)?, )) }) .ok(); let local_endpoint = conn .local_addr() .and_then(|a| { Ok(Endpoint::new_unix_addr( a.as_pathname() .ok_or(std::io::ErrorKind::AddrNotAvailable)?, )) }) .ok(); let (read, write) = split(conn); let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { read_stream, write_stream, peer_endpoint, local_endpoint, } } } #[async_trait] impl Connection for UnixConn where C: Codec + Clone, { type Item = C::Item; fn peer_endpoint(&self) -> Result { self.peer_endpoint .clone() .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } fn local_endpoint(&self) -> Result { self.local_endpoint .clone() .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } async fn recv(&self) -> Result { self.read_stream.lock().await.recv().await } async fn send(&self, msg: Self::Item) -> Result<()> { self.write_stream.lock().await.send(msg).await } } #[allow(dead_code)] pub struct UnixListener { inner: AsyncUnixListener, config: UnixConfig, codec: C, } impl UnixListener where C: Codec + Clone, { pub fn new(listener: AsyncUnixListener, config: UnixConfig, codec: C) -> Self { Self { inner: listener, config, codec, } } } #[async_trait] impl ConnListener for UnixListener where C: Codec + Clone, { type Item = C::Item; fn local_endpoint(&self) -> Result { self.inner .local_addr() .and_then(|a| { Ok(Endpoint::new_unix_addr( a.as_pathname() .ok_or(std::io::ErrorKind::AddrNotAvailable)?, )) }) .map_err(Error::from) } async fn accept(&self) -> Result> { let (conn, _) = self.inner.accept().await?; Ok(Box::new(UnixConn::new(conn, self.codec.clone()))) } } /// Connects to the given Unix socket path. pub async fn dial(endpoint: &Endpoint, _config: UnixConfig, codec: C) -> Result> where C: Codec + Clone, { let path: std::path::PathBuf = endpoint.clone().try_into()?; let conn = UnixStream::connect(path).await?; Ok(UnixConn::new(conn, codec)) } /// Listens on the given Unix socket path. pub fn listen(endpoint: &Endpoint, config: UnixConfig, codec: C) -> Result> where C: Codec + Clone, { let path: std::path::PathBuf = endpoint.clone().try_into()?; let listener = AsyncUnixListener::bind(path)?; Ok(UnixListener::new(listener, config, codec)) } // impl From for Box { // fn from(conn: UnixStream) -> Self { // Box::new(UnixConn::new(conn)) // } // } impl From> for Listener where C: Codec + Clone, { fn from(listener: UnixListener) -> Self { Box::new(listener) } } impl ToConn for UnixConn where C: Codec + Clone, { type Item = C::Item; fn to_conn(self) -> Conn { Box::new(self) } } impl ToListener for UnixListener where C: Codec + Clone, { type Item = C::Item; fn to_listener(self) -> Listener { self.into() } }