aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/codec.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/codec.rs')
-rw-r--r--jsonrpc/src/codec.rs139
1 files changed, 56 insertions, 83 deletions
diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs
index 4a70412..74415c7 100644
--- a/jsonrpc/src/codec.rs
+++ b/jsonrpc/src/codec.rs
@@ -1,100 +1,73 @@
-use memchr::memchr;
+use async_tungstenite::tungstenite::Message;
-use karyon_core::async_util::timeout;
-use karyon_net::Conn;
+use karyon_net::{
+ codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder},
+ Error, Result,
+};
-use crate::{Error, Result};
-
-const DEFAULT_BUFFER_SIZE: usize = 1024;
-const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB
-
-// TODO: Add unit tests for Codec's functions.
-
-/// Represents Codec config
#[derive(Clone)]
-pub struct CodecConfig {
- pub default_buffer_size: usize,
- /// The maximum allowed buffer size to receive a message. If set to zero,
- /// there will be no size limit.
- pub max_allowed_buffer_size: usize,
-}
-
-impl Default for CodecConfig {
- fn default() -> Self {
- Self {
- default_buffer_size: DEFAULT_BUFFER_SIZE,
- max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE,
- }
- }
-}
+pub struct JsonCodec {}
-pub struct Codec {
- conn: Conn,
- config: CodecConfig,
+impl Codec for JsonCodec {
+ type Item = serde_json::Value;
}
-impl Codec {
- /// Creates a new Codec
- pub fn new(conn: Conn, config: CodecConfig) -> Self {
- Self { conn, config }
+impl Encoder for JsonCodec {
+ type EnItem = serde_json::Value;
+ fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> {
+ let msg = match serde_json::to_string(src) {
+ Ok(m) => m,
+ Err(err) => return Err(Error::Encode(err.to_string())),
+ };
+ let buf = msg.as_bytes();
+ dst[..buf.len()].copy_from_slice(buf);
+ Ok(buf.len())
}
+}
- /// Read all bytes into `buffer` until the `0x0A` byte or EOF is
- /// reached.
- ///
- /// If successful, this function will return the total number of bytes read.
- pub async fn read_until(&self, buffer: &mut Vec<u8>) -> Result<usize> {
- let delim = b'\n';
-
- let mut read = 0;
-
- loop {
- let mut tmp_buf = vec![0; self.config.default_buffer_size];
- let n = self.conn.read(&mut tmp_buf).await?;
- if n == 0 {
- return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into()));
- }
-
- match memchr(delim, &tmp_buf) {
- Some(i) => {
- buffer.extend_from_slice(&tmp_buf[..=i]);
- read += i + 1;
- break;
- }
- None => {
- buffer.extend_from_slice(&tmp_buf);
- read += tmp_buf.len();
- }
- }
+impl Decoder for JsonCodec {
+ type DeItem = serde_json::Value;
+ fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> {
+ let de = serde_json::Deserializer::from_slice(src);
+ let mut iter = de.into_iter::<serde_json::Value>();
- if self.config.max_allowed_buffer_size != 0
- && buffer.len() == self.config.max_allowed_buffer_size
- {
- return Err(Error::InvalidMsg(
- "Message exceeds the maximum allowed size",
- ));
- }
- }
+ let item = match iter.next() {
+ Some(Ok(item)) => item,
+ Some(Err(ref e)) if e.is_eof() => return Ok(None),
+ Some(Err(e)) => return Err(Error::Encode(e.to_string())),
+ None => return Ok(None),
+ };
- Ok(read)
+ Ok(Some((iter.byte_offset(), item)))
}
+}
- /// Writes an entire buffer into the given connection.
- pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> {
- while !buf.is_empty() {
- let n = self.conn.write(buf).await?;
- let (_, rest) = std::mem::take(&mut buf).split_at(n);
- buf = rest;
-
- if n == 0 {
- return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into()));
- }
- }
+#[derive(Clone)]
+pub struct WsJsonCodec {}
+impl WebSocketCodec for WsJsonCodec {
+ type Item = serde_json::Value;
+}
- Ok(())
+impl WebSocketEncoder for WsJsonCodec {
+ type EnItem = serde_json::Value;
+ fn encode(&self, src: &Self::EnItem) -> Result<Message> {
+ let msg = match serde_json::to_string(src) {
+ Ok(m) => m,
+ Err(err) => return Err(Error::Encode(err.to_string())),
+ };
+ Ok(Message::Text(msg))
}
+}
- pub async fn read_until_with_timeout(&self, buffer: &mut Vec<u8>, t: u64) -> Result<usize> {
- timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await?
+impl WebSocketDecoder for WsJsonCodec {
+ type DeItem = serde_json::Value;
+ fn decode(&self, src: &Message) -> Result<Self::DeItem> {
+ match src {
+ Message::Text(s) => match serde_json::from_str(s) {
+ Ok(m) => Ok(m),
+ Err(err) => Err(Error::Decode(err.to_string())),
+ },
+ _ => Err(Error::Decode("Receive wrong message".to_string())),
+ }
}
}