From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- net/src/codec/bytes_codec.rs | 29 +++++++++++++++++++++++++ net/src/codec/length_codec.rs | 49 +++++++++++++++++++++++++++++++++++++++++++ net/src/codec/mod.rs | 25 ++++++++++++++++++++++ net/src/codec/websocket.rs | 23 ++++++++++++++++++++ 4 files changed, 126 insertions(+) create mode 100644 net/src/codec/bytes_codec.rs create mode 100644 net/src/codec/length_codec.rs create mode 100644 net/src/codec/mod.rs create mode 100644 net/src/codec/websocket.rs (limited to 'net/src/codec') diff --git a/net/src/codec/bytes_codec.rs b/net/src/codec/bytes_codec.rs new file mode 100644 index 0000000..b319e53 --- /dev/null +++ b/net/src/codec/bytes_codec.rs @@ -0,0 +1,29 @@ +use crate::{ + codec::{Codec, Decoder, Encoder}, + Result, +}; + +#[derive(Clone)] +pub struct BytesCodec {} +impl Codec for BytesCodec { + type Item = Vec; +} + +impl Encoder for BytesCodec { + type EnItem = Vec; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result { + dst[..src.len()].copy_from_slice(src); + Ok(src.len()) + } +} + +impl Decoder for BytesCodec { + type DeItem = Vec; + fn decode(&self, src: &mut [u8]) -> Result> { + if src.is_empty() { + Ok(None) + } else { + Ok(Some((src.len(), src.to_vec()))) + } + } +} diff --git a/net/src/codec/length_codec.rs b/net/src/codec/length_codec.rs new file mode 100644 index 0000000..76a1679 --- /dev/null +++ b/net/src/codec/length_codec.rs @@ -0,0 +1,49 @@ +use karyon_core::util::{decode, encode_into_slice}; + +use crate::{ + codec::{Codec, Decoder, Encoder}, + Result, +}; + +/// The size of the message length. +const MSG_LENGTH_SIZE: usize = std::mem::size_of::(); + +#[derive(Clone)] +pub struct LengthCodec {} +impl Codec for LengthCodec { + type Item = Vec; +} + +impl Encoder for LengthCodec { + type EnItem = Vec; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result { + let length_buf = &mut [0; MSG_LENGTH_SIZE]; + encode_into_slice(&(src.len() as u32), length_buf)?; + dst[..MSG_LENGTH_SIZE].copy_from_slice(length_buf); + dst[MSG_LENGTH_SIZE..src.len() + MSG_LENGTH_SIZE].copy_from_slice(src); + Ok(src.len() + MSG_LENGTH_SIZE) + } +} + +impl Decoder for LengthCodec { + type DeItem = Vec; + fn decode(&self, src: &mut [u8]) -> Result> { + if src.len() < MSG_LENGTH_SIZE { + return Ok(None); + } + + let mut length = [0; MSG_LENGTH_SIZE]; + length.copy_from_slice(&src[..MSG_LENGTH_SIZE]); + let (length, _) = decode::(&length)?; + let length = length as usize; + + if src.len() - MSG_LENGTH_SIZE >= length { + Ok(Some(( + length + MSG_LENGTH_SIZE, + src[MSG_LENGTH_SIZE..length + MSG_LENGTH_SIZE].to_vec(), + ))) + } else { + Ok(None) + } + } +} diff --git a/net/src/codec/mod.rs b/net/src/codec/mod.rs new file mode 100644 index 0000000..565cb07 --- /dev/null +++ b/net/src/codec/mod.rs @@ -0,0 +1,25 @@ +mod bytes_codec; +mod length_codec; +mod websocket; + +pub use bytes_codec::BytesCodec; +pub use length_codec::LengthCodec; +pub use websocket::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder}; + +use crate::Result; + +pub trait Codec: + Decoder + Encoder + Send + Sync + 'static + Unpin +{ + type Item: Send + Sync; +} + +pub trait Encoder { + type EnItem; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result; +} + +pub trait Decoder { + type DeItem; + fn decode(&self, src: &mut [u8]) -> Result>; +} diff --git a/net/src/codec/websocket.rs b/net/src/codec/websocket.rs new file mode 100644 index 0000000..b59a55c --- /dev/null +++ b/net/src/codec/websocket.rs @@ -0,0 +1,23 @@ +use crate::Result; +use async_tungstenite::tungstenite::Message; + +pub trait WebSocketCodec: + WebSocketDecoder + + WebSocketEncoder + + Send + + Sync + + 'static + + Unpin +{ + type Item: Send + Sync; +} + +pub trait WebSocketEncoder { + type EnItem; + fn encode(&self, src: &Self::EnItem) -> Result; +} + +pub trait WebSocketDecoder { + type DeItem; + fn decode(&self, src: &Message) -> Result; +} -- cgit v1.2.3