From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- net/src/transports/tcp.rs | 188 +++++++++++++++++++++++++++++++--------------- 1 file changed, 128 insertions(+), 60 deletions(-) (limited to 'net/src/transports/tcp.rs') diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs index 21fce3d..03c8ab2 100644 --- a/net/src/transports/tcp.rs +++ b/net/src/transports/tcp.rs @@ -1,116 +1,184 @@ use std::net::SocketAddr; use async_trait::async_trait; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use futures_util::SinkExt; + +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, - net::{TcpListener, TcpStream}, + net::{TcpListener as AsyncTcpListener, TcpStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, - Error, Result, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, + Result, }; -/// TCP network connection implementation of the [`Connection`] trait. -pub struct TcpConn { - inner: TcpStream, - read: Mutex>, - write: Mutex>, +/// TCP configuration +#[derive(Clone)] +pub struct TcpConfig { + pub nodelay: bool, +} + +impl Default for TcpConfig { + fn default() -> Self { + Self { nodelay: true } + } +} + +/// TCP connection implementation of the [`Connection`] trait. +pub struct TcpConn { + read_stream: Mutex, C>>, + write_stream: Mutex, C>>, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, } -impl TcpConn { +impl TcpConn +where + C: Codec + Clone, +{ /// Creates a new TcpConn - pub fn new(conn: TcpStream) -> Self { - let (read, write) = split(conn.clone()); + pub fn new( + socket: TcpStream, + codec: C, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, + ) -> Self { + let (read, write) = split(socket); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for TcpConn { +impl Connection for TcpConn +where + C: Codec + Clone, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result { - Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?)) + Ok(self.peer_endpoint.clone()) } fn local_endpoint(&self) -> Result { - Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?)) + Ok(self.local_endpoint.clone()) } - async fn read(&self, buf: &mut [u8]) -> Result { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await + } +} + +pub struct TcpListener { + inner: AsyncTcpListener, + config: TcpConfig, + codec: C, +} + +impl TcpListener +where + C: Codec, +{ + pub fn new(listener: AsyncTcpListener, config: TcpConfig, codec: C) -> Self { + Self { + inner: listener, + config: config.clone(), + codec, + } } } #[async_trait] -impl ConnListener for TcpListener { +impl ConnListener for TcpListener +where + C: Codec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result { - Ok(Endpoint::new_tcp_addr(&self.local_addr()?)) + Ok(Endpoint::new_tcp_addr(self.inner.local_addr()?)) } - async fn accept(&self) -> Result> { - let (conn, _) = self.accept().await?; - conn.set_nodelay(true)?; - Ok(Box::new(TcpConn::new(conn))) + async fn accept(&self) -> Result> { + let (socket, _) = self.inner.accept().await?; + socket.set_nodelay(self.config.nodelay)?; + + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tcp_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tcp_addr)?; + + Ok(Box::new(TcpConn::new( + socket, + self.codec.clone(), + peer_endpoint, + local_endpoint, + ))) } } /// Connects to the given TCP address and port. -pub async fn dial(endpoint: &Endpoint) -> Result { +pub async fn dial(endpoint: &Endpoint, config: TcpConfig, codec: C) -> Result> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let conn = TcpStream::connect(addr).await?; - conn.set_nodelay(true)?; - Ok(TcpConn::new(conn)) + let socket = TcpStream::connect(addr).await?; + socket.set_nodelay(config.nodelay)?; + + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tcp_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tcp_addr)?; + + Ok(TcpConn::new(socket, codec, peer_endpoint, local_endpoint)) } /// Listens on the given TCP address and port. -pub async fn listen(endpoint: &Endpoint) -> Result { +pub async fn listen(endpoint: &Endpoint, config: TcpConfig, codec: C) -> Result> +where + C: Codec, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let listener = TcpListener::bind(addr).await?; - Ok(listener) -} - -impl From for Box { - fn from(conn: TcpStream) -> Self { - Box::new(TcpConn::new(conn)) - } + let listener = AsyncTcpListener::bind(addr).await?; + Ok(TcpListener::new(listener, config, codec)) } -impl From for Box { - fn from(listener: TcpListener) -> Self { +impl From> for Box> +where + C: Clone + Codec, +{ + fn from(listener: TcpListener) -> Self { Box::new(listener) } } -impl ToConn for TcpStream { - fn to_conn(self) -> Box { - self.into() - } -} - -impl ToConn for TcpConn { - fn to_conn(self) -> Box { +impl ToConn for TcpConn +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn { Box::new(self) } } -impl ToListener for TcpListener { - fn to_listener(self) -> Box { +impl ToListener for TcpListener +where + C: Clone + Codec, +{ + type Item = C::Item; + fn to_listener(self) -> Listener { self.into() } } -- cgit v1.2.3