aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2024-03-14 17:01:59 +0100
committerhozan23 <hozan23@proton.me>2024-03-14 17:03:19 +0100
commit340957fec147f4429796413f27bbd9b84ba6f141 (patch)
tree72864defbb3f87550e801391b378957b06addf48
parent4923c76c957af8391a7542288943a1399067215c (diff)
net: add support for websocket protocol
-rw-r--r--Cargo.lock190
-rw-r--r--net/Cargo.toml2
-rw-r--r--net/src/error.rs3
-rw-r--r--net/src/lib.rs2
-rw-r--r--net/src/transports/mod.rs1
-rw-r--r--net/src/transports/ws.rs112
6 files changed, 309 insertions, 1 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 83648f1..8d42b07 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -327,6 +327,30 @@ dependencies = [
]
[[package]]
+name = "async-tungstenite"
+version = "0.25.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef0f8d64ef9351752fbe5462f242c625d9c4910d2bc3f7ec44c43857ca123f5d"
+dependencies = [
+ "futures-io",
+ "futures-util",
+ "log",
+ "pin-project-lite",
+ "tungstenite",
+]
+
+[[package]]
+name = "async_io_stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
+dependencies = [
+ "futures",
+ "pharos",
+ "rustc_version",
+]
+
+[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -463,6 +487,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa"
[[package]]
+name = "byteorder"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
+
+[[package]]
+name = "bytes"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
+
+[[package]]
name = "cc"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -880,6 +916,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382"
[[package]]
+name = "fnv"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+
+[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -895,12 +937,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
+name = "futures"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
@@ -910,6 +968,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
+name = "futures-executor"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -944,6 +1013,17 @@ dependencies = [
]
[[package]]
+name = "futures-macro"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "futures-rustls"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -955,6 +1035,12 @@ dependencies = [
]
[[package]]
+name = "futures-sink"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
+
+[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -966,8 +1052,13 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -1034,6 +1125,23 @@ dependencies = [
]
[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "httparse"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
+
+[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1153,6 +1261,7 @@ name = "karyon_net"
version = "0.1.0"
dependencies = [
"async-trait",
+ "async-tungstenite",
"bincode",
"futures-rustls",
"karyon_core",
@@ -1160,6 +1269,7 @@ dependencies = [
"smol",
"thiserror",
"url",
+ "ws_stream_tungstenite",
]
[[package]]
@@ -1383,6 +1493,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
+name = "pharos"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
+dependencies = [
+ "futures",
+ "rustc_version",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1724,6 +1844,17 @@ dependencies = [
]
[[package]]
+name = "sha1"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
+[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1917,14 +2048,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"pin-project-lite",
+ "tracing-attributes",
"tracing-core",
]
[[package]]
+name = "tracing-attributes"
+version = "0.1.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "tracing-core"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
+dependencies = [
+ "once_cell",
+]
+
+[[package]]
+name = "tungstenite"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "data-encoding",
+ "http",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
[[package]]
name = "typenum"
@@ -1977,6 +2142,12 @@ dependencies = [
]
[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
+[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2264,6 +2435,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
[[package]]
+name = "ws_stream_tungstenite"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a198f414f083fb19fcc1bffcb0fa0cf46d33ccfa229adf248cac12c180e91609"
+dependencies = [
+ "async-tungstenite",
+ "async_io_stream",
+ "bitflags 2.4.2",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-util",
+ "pharos",
+ "rustc_version",
+ "tracing",
+ "tungstenite",
+]
+
+[[package]]
name = "x509-parser"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/net/Cargo.toml b/net/Cargo.toml
index 0b6534c..fe209cd 100644
--- a/net/Cargo.toml
+++ b/net/Cargo.toml
@@ -15,3 +15,5 @@ bincode = { version="2.0.0-rc.3", features = ["derive"]}
thiserror = "1.0.58"
url = "2.5.0"
futures-rustls = "0.25.1"
+async-tungstenite = "0.25.0"
+ws_stream_tungstenite = "0.13.0"
diff --git a/net/src/error.rs b/net/src/error.rs
index be90a03..6e04a12 100644
--- a/net/src/error.rs
+++ b/net/src/error.rs
@@ -28,6 +28,9 @@ pub enum Error {
#[error(transparent)]
ChannelRecv(#[from] smol::channel::RecvError),
+ #[error("Ws Error: {0}")]
+ WsError(#[from] async_tungstenite::tungstenite::Error),
+
#[error("Tls Error: {0}")]
Rustls(#[from] futures_rustls::rustls::Error),
diff --git a/net/src/lib.rs b/net/src/lib.rs
index 5f1c8a6..c1d72b2 100644
--- a/net/src/lib.rs
+++ b/net/src/lib.rs
@@ -8,7 +8,7 @@ pub use {
connection::{dial, Conn, Connection, ToConn},
endpoint::{Addr, Endpoint, Port},
listener::{listen, ConnListener, Listener, ToListener},
- transports::{tcp, tls, udp, unix},
+ transports::{tcp, tls, udp, unix, ws},
};
use error::{Error, Result};
diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs
index ac23021..14ef6f3 100644
--- a/net/src/transports/mod.rs
+++ b/net/src/transports/mod.rs
@@ -2,3 +2,4 @@ pub mod tcp;
pub mod tls;
pub mod udp;
pub mod unix;
+pub mod ws;
diff --git a/net/src/transports/ws.rs b/net/src/transports/ws.rs
new file mode 100644
index 0000000..eaf3b9b
--- /dev/null
+++ b/net/src/transports/ws.rs
@@ -0,0 +1,112 @@
+use std::net::SocketAddr;
+
+use async_trait::async_trait;
+use smol::{
+ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
+ lock::Mutex,
+ net::{TcpListener, TcpStream},
+};
+
+use ws_stream_tungstenite::WsStream;
+
+use crate::{
+ connection::{Connection, ToConn},
+ endpoint::Endpoint,
+ listener::{ConnListener, ToListener},
+ Error, Result,
+};
+
+/// WS network connection implementation of the [`Connection`] trait.
+pub struct WsConn {
+ inner: TcpStream,
+ read: Mutex<ReadHalf<WsStream<TcpStream>>>,
+ write: Mutex<WriteHalf<WsStream<TcpStream>>>,
+}
+
+impl WsConn {
+ /// Creates a new WsConn
+ pub fn new(inner: TcpStream, conn: WsStream<TcpStream>) -> Self {
+ let (read, write) = split(conn);
+ Self {
+ inner,
+ read: Mutex::new(read),
+ write: Mutex::new(write),
+ }
+ }
+}
+
+#[async_trait]
+impl Connection for WsConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?))
+ }
+
+ async fn read(&self, buf: &mut [u8]) -> Result<usize> {
+ self.read.lock().await.read(buf).await.map_err(Error::from)
+ }
+
+ async fn write(&self, buf: &[u8]) -> Result<usize> {
+ self.write
+ .lock()
+ .await
+ .write(buf)
+ .await
+ .map_err(Error::from)
+ }
+}
+
+/// Ws network listener implementation of the `Listener` [`ConnListener`] trait.
+pub struct WsListener {
+ inner: TcpListener,
+}
+
+#[async_trait]
+impl ConnListener for WsListener {
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?))
+ }
+
+ async fn accept(&self) -> Result<Box<dyn Connection>> {
+ let (stream, _) = self.inner.accept().await?;
+ let conn = async_tungstenite::accept_async(stream.clone()).await?;
+ Ok(Box::new(WsConn::new(stream, WsStream::new(conn))))
+ }
+}
+
+/// Connects to the given WS address and port.
+pub async fn dial(endpoint: &Endpoint) -> Result<WsConn> {
+ let addr = SocketAddr::try_from(endpoint.clone())?;
+ let stream = TcpStream::connect(addr).await?;
+ let (conn, _resp) =
+ async_tungstenite::client_async(endpoint.to_string(), stream.clone()).await?;
+ Ok(WsConn::new(stream, WsStream::new(conn)))
+}
+
+/// Listens on the given WS address and port.
+pub async fn listen(endpoint: &Endpoint) -> Result<WsListener> {
+ let addr = SocketAddr::try_from(endpoint.clone())?;
+ let listener = TcpListener::bind(addr).await?;
+ Ok(WsListener { inner: listener })
+}
+
+impl From<WsListener> for Box<dyn ConnListener> {
+ fn from(listener: WsListener) -> Self {
+ Box::new(listener)
+ }
+}
+
+impl ToConn for WsConn {
+ fn to_conn(self) -> Box<dyn Connection> {
+ Box::new(self)
+ }
+}
+
+impl ToListener for WsListener {
+ fn to_listener(self) -> Box<dyn ConnListener> {
+ self.into()
+ }
+}