From 340957fec147f4429796413f27bbd9b84ba6f141 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 14 Mar 2024 17:01:59 +0100 Subject: net: add support for websocket protocol --- Cargo.lock | 190 ++++++++++++++++++++++++++++++++++++++++++++++ net/Cargo.toml | 2 + net/src/error.rs | 3 + net/src/lib.rs | 2 +- net/src/transports/mod.rs | 1 + net/src/transports/ws.rs | 112 +++++++++++++++++++++++++++ 6 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 net/src/transports/ws.rs diff --git a/Cargo.lock b/Cargo.lock index 83648f1..8d42b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,30 @@ dependencies = [ "syn", ] +[[package]] +name = "async-tungstenite" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0f8d64ef9351752fbe5462f242c625d9c4910d2bc3f7ec44c43857ca123f5d" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tungstenite", +] + +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -462,6 +486,18 @@ version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + [[package]] name = "cc" version = "1.0.90" @@ -879,6 +915,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -894,6 +936,21 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -901,6 +958,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -909,6 +967,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -943,6 +1012,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-rustls" version = "0.25.1" @@ -954,6 +1034,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + [[package]] name = "futures-task" version = "0.3.30" @@ -966,8 +1052,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1033,6 +1124,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + [[package]] name = "humantime" version = "2.1.0" @@ -1153,6 +1261,7 @@ name = "karyon_net" version = "0.1.0" dependencies = [ "async-trait", + "async-tungstenite", "bincode", "futures-rustls", "karyon_core", @@ -1160,6 +1269,7 @@ dependencies = [ "smol", "thiserror", "url", + "ws_stream_tungstenite", ] [[package]] @@ -1382,6 +1492,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1723,6 +1843,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1917,14 +2048,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] [[package]] name = "typenum" @@ -1976,6 +2141,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -2263,6 +2434,25 @@ version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +[[package]] +name = "ws_stream_tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a198f414f083fb19fcc1bffcb0fa0cf46d33ccfa229adf248cac12c180e91609" +dependencies = [ + "async-tungstenite", + "async_io_stream", + "bitflags 2.4.2", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "pharos", + "rustc_version", + "tracing", + "tungstenite", +] + [[package]] name = "x509-parser" version = "0.16.0" diff --git a/net/Cargo.toml b/net/Cargo.toml index 0b6534c..fe209cd 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -15,3 +15,5 @@ bincode = { version="2.0.0-rc.3", features = ["derive"]} thiserror = "1.0.58" url = "2.5.0" futures-rustls = "0.25.1" +async-tungstenite = "0.25.0" +ws_stream_tungstenite = "0.13.0" diff --git a/net/src/error.rs b/net/src/error.rs index be90a03..6e04a12 100644 --- a/net/src/error.rs +++ b/net/src/error.rs @@ -28,6 +28,9 @@ pub enum Error { #[error(transparent)] ChannelRecv(#[from] smol::channel::RecvError), + #[error("Ws Error: {0}")] + WsError(#[from] async_tungstenite::tungstenite::Error), + #[error("Tls Error: {0}")] Rustls(#[from] futures_rustls::rustls::Error), diff --git a/net/src/lib.rs b/net/src/lib.rs index 5f1c8a6..c1d72b2 100644 --- a/net/src/lib.rs +++ b/net/src/lib.rs @@ -8,7 +8,7 @@ pub use { connection::{dial, Conn, Connection, ToConn}, endpoint::{Addr, Endpoint, Port}, listener::{listen, ConnListener, Listener, ToListener}, - transports::{tcp, tls, udp, unix}, + transports::{tcp, tls, udp, unix, ws}, }; use error::{Error, Result}; diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs index ac23021..14ef6f3 100644 --- a/net/src/transports/mod.rs +++ b/net/src/transports/mod.rs @@ -2,3 +2,4 @@ pub mod tcp; pub mod tls; pub mod udp; pub mod unix; +pub mod ws; diff --git a/net/src/transports/ws.rs b/net/src/transports/ws.rs new file mode 100644 index 0000000..eaf3b9b --- /dev/null +++ b/net/src/transports/ws.rs @@ -0,0 +1,112 @@ +use std::net::SocketAddr; + +use async_trait::async_trait; +use smol::{ + io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + lock::Mutex, + net::{TcpListener, TcpStream}, +}; + +use ws_stream_tungstenite::WsStream; + +use crate::{ + connection::{Connection, ToConn}, + endpoint::Endpoint, + listener::{ConnListener, ToListener}, + Error, Result, +}; + +/// WS network connection implementation of the [`Connection`] trait. +pub struct WsConn { + inner: TcpStream, + read: Mutex>>, + write: Mutex>>, +} + +impl WsConn { + /// Creates a new WsConn + pub fn new(inner: TcpStream, conn: WsStream) -> Self { + let (read, write) = split(conn); + Self { + inner, + read: Mutex::new(read), + write: Mutex::new(write), + } + } +} + +#[async_trait] +impl Connection for WsConn { + fn peer_endpoint(&self) -> Result { + Ok(Endpoint::new_ws_addr(&self.inner.peer_addr()?)) + } + + fn local_endpoint(&self) -> Result { + Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?)) + } + + async fn read(&self, buf: &mut [u8]) -> Result { + self.read.lock().await.read(buf).await.map_err(Error::from) + } + + async fn write(&self, buf: &[u8]) -> Result { + self.write + .lock() + .await + .write(buf) + .await + .map_err(Error::from) + } +} + +/// Ws network listener implementation of the `Listener` [`ConnListener`] trait. +pub struct WsListener { + inner: TcpListener, +} + +#[async_trait] +impl ConnListener for WsListener { + fn local_endpoint(&self) -> Result { + Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?)) + } + + async fn accept(&self) -> Result> { + let (stream, _) = self.inner.accept().await?; + let conn = async_tungstenite::accept_async(stream.clone()).await?; + Ok(Box::new(WsConn::new(stream, WsStream::new(conn)))) + } +} + +/// Connects to the given WS address and port. +pub async fn dial(endpoint: &Endpoint) -> Result { + let addr = SocketAddr::try_from(endpoint.clone())?; + let stream = TcpStream::connect(addr).await?; + let (conn, _resp) = + async_tungstenite::client_async(endpoint.to_string(), stream.clone()).await?; + Ok(WsConn::new(stream, WsStream::new(conn))) +} + +/// Listens on the given WS address and port. +pub async fn listen(endpoint: &Endpoint) -> Result { + let addr = SocketAddr::try_from(endpoint.clone())?; + let listener = TcpListener::bind(addr).await?; + Ok(WsListener { inner: listener }) +} + +impl From for Box { + fn from(listener: WsListener) -> Self { + Box::new(listener) + } +} + +impl ToConn for WsConn { + fn to_conn(self) -> Box { + Box::new(self) + } +} + +impl ToListener for WsListener { + fn to_listener(self) -> Box { + self.into() + } +} -- cgit v1.2.3