From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- jsonrpc/src/codec.rs | 139 +++++++++++++++++++++------------------------------ 1 file changed, 56 insertions(+), 83 deletions(-) (limited to 'jsonrpc/src/codec.rs') diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index 4a70412..74415c7 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,100 +1,73 @@ -use memchr::memchr; +use async_tungstenite::tungstenite::Message; -use karyon_core::async_util::timeout; -use karyon_net::Conn; +use karyon_net::{ + codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder}, + Error, Result, +}; -use crate::{Error, Result}; - -const DEFAULT_BUFFER_SIZE: usize = 1024; -const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB - -// TODO: Add unit tests for Codec's functions. - -/// Represents Codec config #[derive(Clone)] -pub struct CodecConfig { - pub default_buffer_size: usize, - /// The maximum allowed buffer size to receive a message. If set to zero, - /// there will be no size limit. - pub max_allowed_buffer_size: usize, -} - -impl Default for CodecConfig { - fn default() -> Self { - Self { - default_buffer_size: DEFAULT_BUFFER_SIZE, - max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE, - } - } -} +pub struct JsonCodec {} -pub struct Codec { - conn: Conn, - config: CodecConfig, +impl Codec for JsonCodec { + type Item = serde_json::Value; } -impl Codec { - /// Creates a new Codec - pub fn new(conn: Conn, config: CodecConfig) -> Self { - Self { conn, config } +impl Encoder for JsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + let buf = msg.as_bytes(); + dst[..buf.len()].copy_from_slice(buf); + Ok(buf.len()) } +} - /// Read all bytes into `buffer` until the `0x0A` byte or EOF is - /// reached. - /// - /// If successful, this function will return the total number of bytes read. - pub async fn read_until(&self, buffer: &mut Vec) -> Result { - let delim = b'\n'; - - let mut read = 0; - - loop { - let mut tmp_buf = vec![0; self.config.default_buffer_size]; - let n = self.conn.read(&mut tmp_buf).await?; - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - - match memchr(delim, &tmp_buf) { - Some(i) => { - buffer.extend_from_slice(&tmp_buf[..=i]); - read += i + 1; - break; - } - None => { - buffer.extend_from_slice(&tmp_buf); - read += tmp_buf.len(); - } - } +impl Decoder for JsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &mut [u8]) -> Result> { + let de = serde_json::Deserializer::from_slice(src); + let mut iter = de.into_iter::(); - if self.config.max_allowed_buffer_size != 0 - && buffer.len() == self.config.max_allowed_buffer_size - { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size", - )); - } - } + let item = match iter.next() { + Some(Ok(item)) => item, + Some(Err(ref e)) if e.is_eof() => return Ok(None), + Some(Err(e)) => return Err(Error::Encode(e.to_string())), + None => return Ok(None), + }; - Ok(read) + Ok(Some((iter.byte_offset(), item))) } +} - /// Writes an entire buffer into the given connection. - pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.write(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at(n); - buf = rest; - - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - } +#[derive(Clone)] +pub struct WsJsonCodec {} +impl WebSocketCodec for WsJsonCodec { + type Item = serde_json::Value; +} - Ok(()) +impl WebSocketEncoder for WsJsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem) -> Result { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + Ok(Message::Text(msg)) } +} - pub async fn read_until_with_timeout(&self, buffer: &mut Vec, t: u64) -> Result { - timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? +impl WebSocketDecoder for WsJsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &Message) -> Result { + match src { + Message::Text(s) => match serde_json::from_str(s) { + Ok(m) => Ok(m), + Err(err) => Err(Error::Decode(err.to_string())), + }, + _ => Err(Error::Decode("Receive wrong message".to_string())), + } } } -- cgit v1.2.3