From 598f9e2d47da80f2bec2ead9c2fe215eff157936 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 20 Nov 2023 23:15:10 +0300 Subject: jsonrpc: add Codec struct for reading from and writing to the connection --- jsonrpc/src/codec.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 jsonrpc/src/codec.rs (limited to 'jsonrpc/src/codec.rs') diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs new file mode 100644 index 0000000..ea97e54 --- /dev/null +++ b/jsonrpc/src/codec.rs @@ -0,0 +1,100 @@ +use memchr::memchr; + +use karyons_core::async_utils::timeout; +use karyons_net::Conn; + +use crate::{Error, Result}; + +const DEFAULT_BUFFER_SIZE: usize = 1024; +const DEFAULT_MAX_ALLOWED_MSG_SIZE: usize = 1024 * 1024; // 1MB + +// TODO: Add unit tests for Codec's functions. + +/// Represents Codec config +#[derive(Clone)] +pub struct CodecConfig { + pub default_buffer_size: usize, + /// The maximum allowed size to receive a message. If set to zero, there + /// will be no size limit. + pub max_allowed_msg_size: usize, +} + +impl Default for CodecConfig { + fn default() -> Self { + Self { + default_buffer_size: DEFAULT_BUFFER_SIZE, + max_allowed_msg_size: DEFAULT_MAX_ALLOWED_MSG_SIZE, + } + } +} + +pub struct Codec { + conn: Conn, + config: CodecConfig, +} + +impl Codec { + /// Creates a new Codec + pub fn new(conn: Conn, config: CodecConfig) -> Self { + Self { conn, config } + } + + /// Read all bytes into `buffer` until the `0x0` byte or EOF is + /// reached. + /// + /// If successful, this function will return the total number of bytes read. + pub async fn read_until(&self, buffer: &mut Vec) -> Result { + let delim = b'\0'; + + let mut read = 0; + + loop { + let mut tmp_buf = vec![0; self.config.default_buffer_size]; + let n = self.conn.read(&mut tmp_buf).await?; + if n == 0 { + return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); + } + + match memchr(delim, &tmp_buf) { + Some(i) => { + buffer.extend_from_slice(&tmp_buf[..i]); + read += i; + break; + } + None => { + buffer.extend_from_slice(&tmp_buf); + read += tmp_buf.len(); + } + } + + if self.config.max_allowed_msg_size != 0 + && buffer.len() == self.config.max_allowed_msg_size + { + return Err(Error::InvalidMsg( + "Message exceeds the maximum allowed size", + )); + } + } + + Ok(read) + } + + /// Writes an entire buffer into the given connection. + pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { + while !buf.is_empty() { + let n = self.conn.write(buf).await?; + let (_, rest) = std::mem::take(&mut buf).split_at(n); + buf = rest; + + if n == 0 { + return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); + } + } + + Ok(()) + } + + pub async fn read_until_timeout(&self, buffer: &mut Vec, t: u64) -> Result { + timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? + } +} -- cgit v1.2.3