From 4d51e3211740764764a6423f8ead4944e1790341 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 19 Nov 2023 22:19:06 +0300 Subject: karyons jsonrpc implementation --- jsonrpc/src/utils.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 jsonrpc/src/utils.rs (limited to 'jsonrpc/src/utils.rs') diff --git a/jsonrpc/src/utils.rs b/jsonrpc/src/utils.rs new file mode 100644 index 0000000..1f21b7a --- /dev/null +++ b/jsonrpc/src/utils.rs @@ -0,0 +1,63 @@ +use memchr::memchr; + +use karyons_net::Conn; + +use crate::{Error, Result}; + +const DEFAULT_MSG_SIZE: usize = 1024; +const MAX_ALLOWED_MSG_SIZE: usize = 1024 * 1024; // 1MB + +// TODO: Add unit tests for these functions. + +/// Read all bytes into `buffer` until the `0x0` byte or EOF is +/// reached. +/// +/// If successful, this function will return the total number of bytes read. +pub async fn read_until(conn: &Conn, buffer: &mut Vec) -> Result { + let delim = b'\0'; + + let mut read = 0; + + loop { + let mut tmp_buf = [0; DEFAULT_MSG_SIZE]; + let n = conn.read(&mut tmp_buf).await?; + if n == 0 { + return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); + } + + match memchr(delim, &tmp_buf) { + Some(i) => { + buffer.extend_from_slice(&tmp_buf[..i]); + read += i; + break; + } + None => { + buffer.extend_from_slice(&tmp_buf); + read += tmp_buf.len(); + } + } + + if buffer.len() == MAX_ALLOWED_MSG_SIZE { + return Err(Error::InvalidMsg( + "Message exceeds the maximum allowed size", + )); + } + } + + Ok(read) +} + +/// Writes an entire buffer into the given connection. +pub async fn write_all(conn: &Conn, mut buf: &[u8]) -> Result<()> { + while !buf.is_empty() { + let n = conn.write(buf).await?; + let (_, rest) = std::mem::take(&mut buf).split_at(n); + buf = rest; + + if n == 0 { + return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); + } + } + + Ok(()) +} -- cgit v1.2.3