aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2024-03-14 17:01:59 +0100
committerhozan23 <hozan23@proton.me>2024-03-14 17:03:19 +0100
commit340957fec147f4429796413f27bbd9b84ba6f141 (patch)
tree72864defbb3f87550e801391b378957b06addf48 /net
parent4923c76c957af8391a7542288943a1399067215c (diff)
net: add support for websocket protocol
Diffstat (limited to 'net')
-rw-r--r--net/Cargo.toml2
-rw-r--r--net/src/error.rs3
-rw-r--r--net/src/lib.rs2
-rw-r--r--net/src/transports/mod.rs1
-rw-r--r--net/src/transports/ws.rs112
5 files changed, 119 insertions, 1 deletions
diff --git a/net/Cargo.toml b/net/Cargo.toml
index 0b6534c..fe209cd 100644
--- a/net/Cargo.toml
+++ b/net/Cargo.toml
@@ -15,3 +15,5 @@ bincode = { version="2.0.0-rc.3", features = ["derive"]}
thiserror = "1.0.58"
url = "2.5.0"
futures-rustls = "0.25.1"
+async-tungstenite = "0.25.0"
+ws_stream_tungstenite = "0.13.0"
diff --git a/net/src/error.rs b/net/src/error.rs
index be90a03..6e04a12 100644
--- a/net/src/error.rs
+++ b/net/src/error.rs
@@ -28,6 +28,9 @@ pub enum Error {
#[error(transparent)]
ChannelRecv(#[from] smol::channel::RecvError),
+ #[error("Ws Error: {0}")]
+ WsError(#[from] async_tungstenite::tungstenite::Error),
+
#[error("Tls Error: {0}")]
Rustls(#[from] futures_rustls::rustls::Error),
diff --git a/net/src/lib.rs b/net/src/lib.rs
index 5f1c8a6..c1d72b2 100644
--- a/net/src/lib.rs
+++ b/net/src/lib.rs
@@ -8,7 +8,7 @@ pub use {
connection::{dial, Conn, Connection, ToConn},
endpoint::{Addr, Endpoint, Port},
listener::{listen, ConnListener, Listener, ToListener},
- transports::{tcp, tls, udp, unix},
+ transports::{tcp, tls, udp, unix, ws},
};
use error::{Error, Result};
diff --git a/net/src/transports/mod.rs b/net/src/transports/mod.rs
index ac23021..14ef6f3 100644
--- a/net/src/transports/mod.rs
+++ b/net/src/transports/mod.rs
@@ -2,3 +2,4 @@ pub mod tcp;
pub mod tls;
pub mod udp;
pub mod unix;
+pub mod ws;
diff --git a/net/src/transports/ws.rs b/net/src/transports/ws.rs
new file mode 100644
index 0000000..eaf3b9b
--- /dev/null
+++ b/net/src/transports/ws.rs
@@ -0,0 +1,112 @@
+use std::net::SocketAddr;
+
+use async_trait::async_trait;
+use smol::{
+ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
+ lock::Mutex,
+ net::{TcpListener, TcpStream},
+};
+
+use ws_stream_tungstenite::WsStream;
+
+use crate::{
+ connection::{Connection, ToConn},
+ endpoint::Endpoint,
+ listener::{ConnListener, ToListener},
+ Error, Result,
+};
+
+/// WS network connection implementation of the [`Connection`] trait.
+pub struct WsConn {
+ inner: TcpStream,
+ read: Mutex<ReadHalf<WsStream<TcpStream>>>,
+ write: Mutex<WriteHalf<WsStream<TcpStream>>>,
+}
+
+impl WsConn {
+ /// Creates a new WsConn
+ pub fn new(inner: TcpStream, conn: WsStream<TcpStream>) -> Self {
+ let (read, write) = split(conn);
+ Self {
+ inner,
+ read: Mutex::new(read),
+ write: Mutex::new(write),
+ }
+ }
+}
+
+#[async_trait]
+impl Connection for WsConn {
+ fn peer_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.peer_addr()?))
+ }
+
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?))
+ }
+
+ async fn read(&self, buf: &mut [u8]) -> Result<usize> {
+ self.read.lock().await.read(buf).await.map_err(Error::from)
+ }
+
+ async fn write(&self, buf: &[u8]) -> Result<usize> {
+ self.write
+ .lock()
+ .await
+ .write(buf)
+ .await
+ .map_err(Error::from)
+ }
+}
+
+/// Ws network listener implementation of the `Listener` [`ConnListener`] trait.
+pub struct WsListener {
+ inner: TcpListener,
+}
+
+#[async_trait]
+impl ConnListener for WsListener {
+ fn local_endpoint(&self) -> Result<Endpoint> {
+ Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?))
+ }
+
+ async fn accept(&self) -> Result<Box<dyn Connection>> {
+ let (stream, _) = self.inner.accept().await?;
+ let conn = async_tungstenite::accept_async(stream.clone()).await?;
+ Ok(Box::new(WsConn::new(stream, WsStream::new(conn))))
+ }
+}
+
+/// Connects to the given WS address and port.
+pub async fn dial(endpoint: &Endpoint) -> Result<WsConn> {
+ let addr = SocketAddr::try_from(endpoint.clone())?;
+ let stream = TcpStream::connect(addr).await?;
+ let (conn, _resp) =
+ async_tungstenite::client_async(endpoint.to_string(), stream.clone()).await?;
+ Ok(WsConn::new(stream, WsStream::new(conn)))
+}
+
+/// Listens on the given WS address and port.
+pub async fn listen(endpoint: &Endpoint) -> Result<WsListener> {
+ let addr = SocketAddr::try_from(endpoint.clone())?;
+ let listener = TcpListener::bind(addr).await?;
+ Ok(WsListener { inner: listener })
+}
+
+impl From<WsListener> for Box<dyn ConnListener> {
+ fn from(listener: WsListener) -> Self {
+ Box::new(listener)
+ }
+}
+
+impl ToConn for WsConn {
+ fn to_conn(self) -> Box<dyn Connection> {
+ Box::new(self)
+ }
+}
+
+impl ToListener for WsListener {
+ fn to_listener(self) -> Box<dyn ConnListener> {
+ self.into()
+ }
+}