diff options
Diffstat (limited to 'jsonrpc/src/codec.rs')
| -rw-r--r-- | jsonrpc/src/codec.rs | 139 | 
1 files changed, 56 insertions, 83 deletions
| diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index 4a70412..74415c7 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,100 +1,73 @@ -use memchr::memchr; +use async_tungstenite::tungstenite::Message; -use karyon_core::async_util::timeout; -use karyon_net::Conn; +use karyon_net::{ +    codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder}, +    Error, Result, +}; -use crate::{Error, Result}; - -const DEFAULT_BUFFER_SIZE: usize = 1024; -const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB - -// TODO: Add unit tests for Codec's functions. - -/// Represents Codec config  #[derive(Clone)] -pub struct CodecConfig { -    pub default_buffer_size: usize, -    /// The maximum allowed buffer size to receive a message. If set to zero, -    /// there will be no size limit. -    pub max_allowed_buffer_size: usize, -} - -impl Default for CodecConfig { -    fn default() -> Self { -        Self { -            default_buffer_size: DEFAULT_BUFFER_SIZE, -            max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE, -        } -    } -} +pub struct JsonCodec {} -pub struct Codec { -    conn: Conn, -    config: CodecConfig, +impl Codec for JsonCodec { +    type Item = serde_json::Value;  } -impl Codec { -    /// Creates a new Codec -    pub fn new(conn: Conn, config: CodecConfig) -> Self { -        Self { conn, config } +impl Encoder for JsonCodec { +    type EnItem = serde_json::Value; +    fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { +        let msg = match serde_json::to_string(src) { +            Ok(m) => m, +            Err(err) => return Err(Error::Encode(err.to_string())), +        }; +        let buf = msg.as_bytes(); +        dst[..buf.len()].copy_from_slice(buf); +        Ok(buf.len())      } +} -    /// Read all bytes into `buffer` until the `0x0A` byte or EOF is -    /// reached. -    /// -    /// If successful, this function will return the total number of bytes read. -    pub async fn read_until(&self, buffer: &mut Vec<u8>) -> Result<usize> { -        let delim = b'\n'; - -        let mut read = 0; - -        loop { -            let mut tmp_buf = vec![0; self.config.default_buffer_size]; -            let n = self.conn.read(&mut tmp_buf).await?; -            if n == 0 { -                return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); -            } - -            match memchr(delim, &tmp_buf) { -                Some(i) => { -                    buffer.extend_from_slice(&tmp_buf[..=i]); -                    read += i + 1; -                    break; -                } -                None => { -                    buffer.extend_from_slice(&tmp_buf); -                    read += tmp_buf.len(); -                } -            } +impl Decoder for JsonCodec { +    type DeItem = serde_json::Value; +    fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { +        let de = serde_json::Deserializer::from_slice(src); +        let mut iter = de.into_iter::<serde_json::Value>(); -            if self.config.max_allowed_buffer_size != 0 -                && buffer.len() == self.config.max_allowed_buffer_size -            { -                return Err(Error::InvalidMsg( -                    "Message exceeds the maximum allowed size", -                )); -            } -        } +        let item = match iter.next() { +            Some(Ok(item)) => item, +            Some(Err(ref e)) if e.is_eof() => return Ok(None), +            Some(Err(e)) => return Err(Error::Encode(e.to_string())), +            None => return Ok(None), +        }; -        Ok(read) +        Ok(Some((iter.byte_offset(), item)))      } +} -    /// Writes an entire buffer into the given connection. -    pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { -        while !buf.is_empty() { -            let n = self.conn.write(buf).await?; -            let (_, rest) = std::mem::take(&mut buf).split_at(n); -            buf = rest; - -            if n == 0 { -                return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); -            } -        } +#[derive(Clone)] +pub struct WsJsonCodec {} +impl WebSocketCodec for WsJsonCodec { +    type Item = serde_json::Value; +} -        Ok(()) +impl WebSocketEncoder for WsJsonCodec { +    type EnItem = serde_json::Value; +    fn encode(&self, src: &Self::EnItem) -> Result<Message> { +        let msg = match serde_json::to_string(src) { +            Ok(m) => m, +            Err(err) => return Err(Error::Encode(err.to_string())), +        }; +        Ok(Message::Text(msg))      } +} -    pub async fn read_until_with_timeout(&self, buffer: &mut Vec<u8>, t: u64) -> Result<usize> { -        timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? +impl WebSocketDecoder for WsJsonCodec { +    type DeItem = serde_json::Value; +    fn decode(&self, src: &Message) -> Result<Self::DeItem> { +        match src { +            Message::Text(s) => match serde_json::from_str(s) { +                Ok(m) => Ok(m), +                Err(err) => Err(Error::Decode(err.to_string())), +            }, +            _ => Err(Error::Decode("Receive wrong message".to_string())), +        }      }  } | 
