From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- net/src/transports/unix.rs | 193 ++++++++++++++++++++++++++++++++------------- 1 file changed, 137 insertions(+), 56 deletions(-) (limited to 'net/src/transports/unix.rs') diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index 494e104..bafebaf 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -1,111 +1,192 @@ use async_trait::async_trait; +use futures_util::SinkExt; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, - net::unix::{UnixListener, UnixStream}, + net::{UnixListener as AsyncUnixListener, UnixStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, Error, Result, }; +/// Unix Conn config +#[derive(Clone, Default)] +pub struct UnixConfig {} + /// Unix domain socket implementation of the [`Connection`] trait. -pub struct UnixConn { - inner: UnixStream, - read: Mutex>, - write: Mutex>, +pub struct UnixConn { + read_stream: Mutex, C>>, + write_stream: Mutex, C>>, + peer_endpoint: Option, + local_endpoint: Option, } -impl UnixConn { - /// Creates a new UnixConn - pub fn new(conn: UnixStream) -> Self { - let (read, write) = split(conn.clone()); +impl UnixConn +where + C: Codec + Clone, +{ + /// Creates a new TcpConn + pub fn new(conn: UnixStream, codec: C) -> Self { + let peer_endpoint = conn + .peer_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + let local_endpoint = conn + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + + let (read, write) = split(conn); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for UnixConn { +impl Connection for UnixConn +where + C: Codec + Clone, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result { - Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) + self.peer_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } fn local_endpoint(&self) -> Result { - Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) + self.local_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } - async fn read(&self, buf: &mut [u8]) -> Result { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await + } +} + +#[allow(dead_code)] +pub struct UnixListener { + inner: AsyncUnixListener, + config: UnixConfig, + codec: C, +} + +impl UnixListener +where + C: Codec + Clone, +{ + pub fn new(listener: AsyncUnixListener, config: UnixConfig, codec: C) -> Self { + Self { + inner: listener, + config, + codec, + } } } #[async_trait] -impl ConnListener for UnixListener { +impl ConnListener for UnixListener +where + C: Codec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result { - Ok(Endpoint::new_unix_addr(&self.local_addr()?)) + self.inner + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .map_err(Error::from) } - async fn accept(&self) -> Result> { - let (conn, _) = self.accept().await?; - Ok(Box::new(UnixConn::new(conn))) + async fn accept(&self) -> Result> { + let (conn, _) = self.inner.accept().await?; + Ok(Box::new(UnixConn::new(conn, self.codec.clone()))) } } /// Connects to the given Unix socket path. -pub async fn dial(path: &String) -> Result { +pub async fn dial(endpoint: &Endpoint, _config: UnixConfig, codec: C) -> Result> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; let conn = UnixStream::connect(path).await?; - Ok(UnixConn::new(conn)) + Ok(UnixConn::new(conn, codec)) } /// Listens on the given Unix socket path. -pub fn listen(path: &String) -> Result { - let listener = UnixListener::bind(path)?; - Ok(listener) -} - -impl From for Box { - fn from(conn: UnixStream) -> Self { - Box::new(UnixConn::new(conn)) - } +pub fn listen(endpoint: &Endpoint, config: UnixConfig, codec: C) -> Result> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; + let listener = AsyncUnixListener::bind(path)?; + Ok(UnixListener::new(listener, config, codec)) } -impl From for Box { - fn from(listener: UnixListener) -> Self { +// impl From for Box { +// fn from(conn: UnixStream) -> Self { +// Box::new(UnixConn::new(conn)) +// } +// } + +impl From> for Listener +where + C: Codec + Clone, +{ + fn from(listener: UnixListener) -> Self { Box::new(listener) } } -impl ToConn for UnixStream { - fn to_conn(self) -> Box { - self.into() - } -} - -impl ToConn for UnixConn { - fn to_conn(self) -> Box { +impl ToConn for UnixConn +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn { Box::new(self) } } -impl ToListener for UnixListener { - fn to_listener(self) -> Box { +impl ToListener for UnixListener +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_listener(self) -> Listener { self.into() } } -- cgit v1.2.3