diff options
| author | hozan23 <hozan23@karyontech.net> | 2024-04-11 10:19:20 +0200 | 
|---|---|---|
| committer | hozan23 <hozan23@karyontech.net> | 2024-05-19 13:51:30 +0200 | 
| commit | 0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch) | |
| tree | 961d73218af672797d49f899289bef295bc56493 /net/src/transports/unix.rs | |
| parent | a69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff) | |
add support for tokio & improve net crate api
Diffstat (limited to 'net/src/transports/unix.rs')
| -rw-r--r-- | net/src/transports/unix.rs | 193 | 
1 files changed, 137 insertions, 56 deletions
| diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index 494e104..bafebaf 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -1,111 +1,192 @@  use async_trait::async_trait; +use futures_util::SinkExt; -use smol::{ -    io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use karyon_core::async_runtime::{ +    io::{split, ReadHalf, WriteHalf},      lock::Mutex, -    net::unix::{UnixListener, UnixStream}, +    net::{UnixListener as AsyncUnixListener, UnixStream},  };  use crate::{ -    connection::{Connection, ToConn}, +    codec::Codec, +    connection::{Conn, Connection, ToConn},      endpoint::Endpoint, -    listener::{ConnListener, ToListener}, +    listener::{ConnListener, Listener, ToListener}, +    stream::{ReadStream, WriteStream},      Error, Result,  }; +/// Unix Conn config +#[derive(Clone, Default)] +pub struct UnixConfig {} +  /// Unix domain socket implementation of the [`Connection`] trait. -pub struct UnixConn { -    inner: UnixStream, -    read: Mutex<ReadHalf<UnixStream>>, -    write: Mutex<WriteHalf<UnixStream>>, +pub struct UnixConn<C> { +    read_stream: Mutex<ReadStream<ReadHalf<UnixStream>, C>>, +    write_stream: Mutex<WriteStream<WriteHalf<UnixStream>, C>>, +    peer_endpoint: Option<Endpoint>, +    local_endpoint: Option<Endpoint>,  } -impl UnixConn { -    /// Creates a new UnixConn -    pub fn new(conn: UnixStream) -> Self { -        let (read, write) = split(conn.clone()); +impl<C> UnixConn<C> +where +    C: Codec + Clone, +{ +    /// Creates a new TcpConn +    pub fn new(conn: UnixStream, codec: C) -> Self { +        let peer_endpoint = conn +            .peer_addr() +            .and_then(|a| { +                Ok(Endpoint::new_unix_addr( +                    a.as_pathname() +                        .ok_or(std::io::ErrorKind::AddrNotAvailable)?, +                )) +            }) +            .ok(); +        let local_endpoint = conn +            .local_addr() +            .and_then(|a| { +                Ok(Endpoint::new_unix_addr( +                    a.as_pathname() +                        .ok_or(std::io::ErrorKind::AddrNotAvailable)?, +                )) +            }) +            .ok(); + +        let (read, write) = split(conn); +        let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); +        let write_stream = Mutex::new(WriteStream::new(write, codec));          Self { -            inner: conn, -            read: Mutex::new(read), -            write: Mutex::new(write), +            read_stream, +            write_stream, +            peer_endpoint, +            local_endpoint,          }      }  }  #[async_trait] -impl Connection for UnixConn { +impl<C> Connection for UnixConn<C> +where +    C: Codec + Clone, +{ +    type Item = C::Item;      fn peer_endpoint(&self) -> Result<Endpoint> { -        Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) +        self.peer_endpoint +            .clone() +            .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into()))      }      fn local_endpoint(&self) -> Result<Endpoint> { -        Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) +        self.local_endpoint +            .clone() +            .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into()))      } -    async fn read(&self, buf: &mut [u8]) -> Result<usize> { -        self.read.lock().await.read(buf).await.map_err(Error::from) +    async fn recv(&self) -> Result<Self::Item> { +        self.read_stream.lock().await.recv().await      } -    async fn write(&self, buf: &[u8]) -> Result<usize> { -        self.write -            .lock() -            .await -            .write(buf) -            .await -            .map_err(Error::from) +    async fn send(&self, msg: Self::Item) -> Result<()> { +        self.write_stream.lock().await.send(msg).await +    } +} + +#[allow(dead_code)] +pub struct UnixListener<C> { +    inner: AsyncUnixListener, +    config: UnixConfig, +    codec: C, +} + +impl<C> UnixListener<C> +where +    C: Codec + Clone, +{ +    pub fn new(listener: AsyncUnixListener, config: UnixConfig, codec: C) -> Self { +        Self { +            inner: listener, +            config, +            codec, +        }      }  }  #[async_trait] -impl ConnListener for UnixListener { +impl<C> ConnListener for UnixListener<C> +where +    C: Codec + Clone, +{ +    type Item = C::Item;      fn local_endpoint(&self) -> Result<Endpoint> { -        Ok(Endpoint::new_unix_addr(&self.local_addr()?)) +        self.inner +            .local_addr() +            .and_then(|a| { +                Ok(Endpoint::new_unix_addr( +                    a.as_pathname() +                        .ok_or(std::io::ErrorKind::AddrNotAvailable)?, +                )) +            }) +            .map_err(Error::from)      } -    async fn accept(&self) -> Result<Box<dyn Connection>> { -        let (conn, _) = self.accept().await?; -        Ok(Box::new(UnixConn::new(conn))) +    async fn accept(&self) -> Result<Conn<C::Item>> { +        let (conn, _) = self.inner.accept().await?; +        Ok(Box::new(UnixConn::new(conn, self.codec.clone())))      }  }  /// Connects to the given Unix socket path. -pub async fn dial(path: &String) -> Result<UnixConn> { +pub async fn dial<C>(endpoint: &Endpoint, _config: UnixConfig, codec: C) -> Result<UnixConn<C>> +where +    C: Codec + Clone, +{ +    let path: std::path::PathBuf = endpoint.clone().try_into()?;      let conn = UnixStream::connect(path).await?; -    Ok(UnixConn::new(conn)) +    Ok(UnixConn::new(conn, codec))  }  /// Listens on the given Unix socket path. -pub fn listen(path: &String) -> Result<UnixListener> { -    let listener = UnixListener::bind(path)?; -    Ok(listener) -} - -impl From<UnixStream> for Box<dyn Connection> { -    fn from(conn: UnixStream) -> Self { -        Box::new(UnixConn::new(conn)) -    } +pub fn listen<C>(endpoint: &Endpoint, config: UnixConfig, codec: C) -> Result<UnixListener<C>> +where +    C: Codec + Clone, +{ +    let path: std::path::PathBuf = endpoint.clone().try_into()?; +    let listener = AsyncUnixListener::bind(path)?; +    Ok(UnixListener::new(listener, config, codec))  } -impl From<UnixListener> for Box<dyn ConnListener> { -    fn from(listener: UnixListener) -> Self { +// impl From<UnixStream> for Box<dyn Connection> { +//     fn from(conn: UnixStream) -> Self { +//         Box::new(UnixConn::new(conn)) +//     } +// } + +impl<C> From<UnixListener<C>> for Listener<C::Item> +where +    C: Codec + Clone, +{ +    fn from(listener: UnixListener<C>) -> Self {          Box::new(listener)      }  } -impl ToConn for UnixStream { -    fn to_conn(self) -> Box<dyn Connection> { -        self.into() -    } -} - -impl ToConn for UnixConn { -    fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for UnixConn<C> +where +    C: Codec + Clone, +{ +    type Item = C::Item; +    fn to_conn(self) -> Conn<Self::Item> {          Box::new(self)      }  } -impl ToListener for UnixListener { -    fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for UnixListener<C> +where +    C: Codec + Clone, +{ +    type Item = C::Item; +    fn to_listener(self) -> Listener<Self::Item> {          self.into()      }  } | 
