diff options
Diffstat (limited to 'net/examples/tcp_codec_tokio/src')
-rw-r--r-- | net/examples/tcp_codec_tokio/src/main.rs | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/net/examples/tcp_codec_tokio/src/main.rs b/net/examples/tcp_codec_tokio/src/main.rs new file mode 100644 index 0000000..9865bfa --- /dev/null +++ b/net/examples/tcp_codec_tokio/src/main.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use karyon_core::async_util::sleep; + +use karyon_net::{ + codec::{Codec, Decoder, Encoder}, + tcp, ConnListener, Connection, Endpoint, Result, +}; + +#[derive(Clone)] +struct NewLineCodec {} + +impl Codec for NewLineCodec { + type Item = String; +} + +impl Encoder for NewLineCodec { + type EnItem = String; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + dst[..src.len()].copy_from_slice(src.as_bytes()); + Ok(src.len()) + } +} + +impl Decoder for NewLineCodec { + type DeItem = String; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + match src.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some((i + 1, String::from_utf8(src[..i].to_vec()).unwrap()))), + None => Ok(None), + } + } +} + +#[tokio::main] +async fn main() { + let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); + + let config = tcp::TcpConfig::default(); + + let listener = tcp::listen(&endpoint, config.clone(), NewLineCodec {}) + .await + .unwrap(); + tokio::spawn(async move { + if let Ok(conn) = listener.accept().await { + loop { + let msg = conn.recv().await.unwrap(); + println!("Receive a message: {:?}", msg); + } + }; + }); + + let conn = tcp::dial(&endpoint, config, NewLineCodec {}).await.unwrap(); + conn.send("hello".to_string()).await.unwrap(); + conn.send(" world\n".to_string()).await.unwrap(); + sleep(Duration::from_secs(1)).await; +} |