From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- jsonrpc/Cargo.toml | 45 +++++--- jsonrpc/README.md | 73 ++++++++----- jsonrpc/examples/client.py | 10 +- jsonrpc/examples/client.rs | 11 +- jsonrpc/examples/server.rs | 34 +++--- jsonrpc/jsonrpc_internal/Cargo.toml | 18 ++++ jsonrpc/jsonrpc_internal/src/error.rs | 40 +++++++ jsonrpc/jsonrpc_internal/src/lib.rs | 65 ++++++++++++ jsonrpc/jsonrpc_macro/Cargo.toml | 24 +++++ jsonrpc/jsonrpc_macro/src/lib.rs | 47 +++++++++ jsonrpc/src/client.rs | 155 ++++++++++++++++++++------- jsonrpc/src/codec.rs | 139 ++++++++++-------------- jsonrpc/src/error.rs | 34 ------ jsonrpc/src/lib.rs | 65 ++++++------ jsonrpc/src/message.rs | 2 + jsonrpc/src/server.rs | 193 +++++++++++++++++++++++----------- jsonrpc/src/service.rs | 64 ----------- jsonrpc/tests/impl_rpc_service.rs | 27 +++++ jsonrpc/tests/rpc_impl.rs | 26 +++++ 19 files changed, 697 insertions(+), 375 deletions(-) create mode 100644 jsonrpc/jsonrpc_internal/Cargo.toml create mode 100644 jsonrpc/jsonrpc_internal/src/error.rs create mode 100644 jsonrpc/jsonrpc_internal/src/lib.rs create mode 100644 jsonrpc/jsonrpc_macro/Cargo.toml create mode 100644 jsonrpc/jsonrpc_macro/src/lib.rs delete mode 100644 jsonrpc/src/error.rs delete mode 100644 jsonrpc/src/service.rs create mode 100644 jsonrpc/tests/impl_rpc_service.rs create mode 100644 jsonrpc/tests/rpc_impl.rs (limited to 'jsonrpc') diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 73ae275..e81ec10 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -1,28 +1,49 @@ [package] name = "karyon_jsonrpc" -version.workspace = true +version.workspace = true edition.workspace = true -autoexamples = false + +[features] +default = ["smol"] +smol = [ + "karyon_core/smol", + "karyon_net/smol", + "karyon_jsonrpc_internal/smol", + "karyon_jsonrpc_macro/smol", + "dep:futures-rustls", + "async-tungstenite/async-std-runtime", +] +tokio = [ + "karyon_core/tokio", + "karyon_net/tokio", + "karyon_jsonrpc_internal/tokio", + "karyon_jsonrpc_macro/tokio", + "async-tungstenite/tokio-runtime", + "dep:tokio-rustls", +] [dependencies] -karyon_core.workspace = true -karyon_net.workspace = true +karyon_core = { workspace = true, default-features = false } +karyon_net = { workspace = true, default-features = false } + +karyon_jsonrpc_macro = { path = "jsonrpc_macro", default-features = false } +karyon_jsonrpc_internal = { path = "jsonrpc_internal", default-features = false } -smol = "2.0.0" log = "0.4.21" rand = "0.8.5" +async-tungstenite = { version = "0.25.0", default-features = false } serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" thiserror = "1.0.58" memchr = "2.7.1" +async-trait = "0.1.77" -[[example]] -name = "server" -path = "examples/server.rs" - -[[example]] -name = "client" -path = "examples/client.rs" +futures-rustls = { version = "0.25.1", optional = true } +tokio-rustls = { version = "0.26.0", optional = true } [dev-dependencies] env_logger = "0.11.3" +rcgen = "0.13.1" +rustls-pemfile = "2.1.2" +smol = "2.0.0" +webpki-roots = "0.26.1" diff --git a/jsonrpc/README.md b/jsonrpc/README.md index af7dfe2..98c18e1 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -1,52 +1,73 @@ # karyon jsonrpc A fast and lightweight async implementation of [JSON-RPC -2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. +2.0](https://www.jsonrpc.org/specification). + +features: +- Supports TCP, TLS, WebSocket, and Unix protocols. +- Uses smol(async-std) as the async runtime, but also supports tokio via the + `tokio` feature. +- Allows registration of multiple services (structs) of different types on a + single server. ## Example -```rust +``` use std::sync::Arc; use serde_json::Value; use smol::net::{TcpStream, TcpListener}; -use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; +use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; struct HelloWorld {} +#[rpc_impl] impl HelloWorld { - async fn say_hello(&self, params: Value) -> Result { + async fn say_hello(&self, params: Value) -> Result { let msg: String = serde_json::from_value(params)?; Ok(serde_json::json!(format!("Hello {msg}!"))) } -} -let ex = Arc::new(smol::Executor::new()); + async fn foo(&self, params: Value) -> Result { + Ok(serde_json::json!("foo!")) + } -////////////////// -// Server -////////////////// -// Creates a new server -let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); -let config = ServerConfig::default(); -let server = Server::new(listener, config, ex.clone()); + async fn bar(&self, params: Value) -> Result { + Ok(serde_json::json!("bar!")) + } +} -// Register the HelloWorld service -register_service!(HelloWorld, say_hello); -server.attach_service(HelloWorld{}); +// Server +async { + // Creates a new server + let server = Server::builder("tcp://127.0.0.1:60000") + .expect("create new server builder") + .service(HelloWorld{}) + .build() + .await + .expect("build the server"); -// Starts the server -ex.run(server.start()); + // Starts the server + server.start().await.expect("start the server"); +}; -////////////////// -// Client -////////////////// -// Creates a new client -let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); -let config = ClientConfig::default(); -let client = Client::new(conn, config); +// Client +async { + // Creates a new client + let client = Client::builder("tcp://127.0.0.1:60000") + .expect("create new client builder") + .build() + .await + .expect("build the client"); -let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); + let result: String = client.call("HelloWorld.say_hello", "world".to_string()) + .await + .expect("send a request"); +}; ``` + + + + diff --git a/jsonrpc/examples/client.py b/jsonrpc/examples/client.py index 2066e82..745d5db 100644 --- a/jsonrpc/examples/client.py +++ b/jsonrpc/examples/client.py @@ -3,7 +3,7 @@ import random import json HOST = "127.0.0.1" -PORT = 60000 +PORT = 6000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) @@ -15,7 +15,7 @@ req = { "params": {"x": 4, "y": 3}, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -27,7 +27,7 @@ req = { "params": {"x": 4, "y": 3}, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -39,7 +39,7 @@ req = { "params": None, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -51,7 +51,7 @@ req = { "params": None, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs index 2c8cf83..3289772 100644 --- a/jsonrpc/examples/client.rs +++ b/jsonrpc/examples/client.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; -use smol::net::TcpStream; -use karyon_jsonrpc::{Client, ClientConfig}; +use karyon_jsonrpc::Client; #[derive(Deserialize, Serialize)] struct Req { @@ -15,9 +14,11 @@ struct Pong {} fn main() { env_logger::init(); smol::future::block_on(async { - let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); - let config = ClientConfig::default(); - let client = Client::new(conn, config); + let client = Client::builder("tcp://127.0.0.1:6000") + .expect("Create client builder") + .build() + .await + .unwrap(); let params = Req { x: 10, y: 7 }; let result: u32 = client.call("Calc.add", params).await.unwrap(); diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 6953433..841e276 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -1,10 +1,7 @@ -use std::sync::Arc; - use serde::{Deserialize, Serialize}; use serde_json::Value; -use smol::net::TcpListener; -use karyon_jsonrpc::{register_service, JsonRPCError, Server, ServerConfig}; +use karyon_jsonrpc::{rpc_impl, Error, Server}; struct Calc { version: String, @@ -19,43 +16,44 @@ struct Req { #[derive(Deserialize, Serialize)] struct Pong {} +#[rpc_impl] impl Calc { - async fn ping(&self, _params: Value) -> Result { + async fn ping(&self, _params: Value) -> Result { Ok(serde_json::json!(Pong {})) } - async fn add(&self, params: Value) -> Result { + async fn add(&self, params: Value) -> Result { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x + params.y)) } - async fn sub(&self, params: Value) -> Result { + async fn sub(&self, params: Value) -> Result { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x - params.y)) } - async fn version(&self, _params: Value) -> Result { + async fn version(&self, _params: Value) -> Result { Ok(serde_json::json!(self.version)) } } fn main() { env_logger::init(); - let ex = Arc::new(smol::Executor::new()); - smol::block_on(ex.clone().run(async { - // Creates a new server - let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); - let config = ServerConfig::default(); - let server = Server::new(listener, config, ex); - + smol::block_on(async { // Register the Calc service - register_service!(Calc, ping, add, sub, version); let calc = Calc { version: String::from("0.1"), }; - server.attach_service(calc).await; + + // Creates a new server + let server = Server::builder("tcp://127.0.0.1:6000") + .expect("Create a new server builder") + .service(calc) + .build() + .await + .expect("start a new server"); // Start the server server.start().await.unwrap(); - })); + }); } diff --git a/jsonrpc/jsonrpc_internal/Cargo.toml b/jsonrpc/jsonrpc_internal/Cargo.toml new file mode 100644 index 0000000..5a3acc4 --- /dev/null +++ b/jsonrpc/jsonrpc_internal/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "karyon_jsonrpc_internal" +version.workspace = true +edition.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["smol"] +smol = ["karyon_core/smol", "karyon_net/smol"] +tokio = ["karyon_core/tokio", "karyon_net/tokio"] + +[dependencies] +karyon_core = { workspace = true, default-features = false } +karyon_net = { workspace = true, default-features = false } + +serde_json = "1.0.114" +thiserror = "1.0.58" diff --git a/jsonrpc/jsonrpc_internal/src/error.rs b/jsonrpc/jsonrpc_internal/src/error.rs new file mode 100644 index 0000000..7f89729 --- /dev/null +++ b/jsonrpc/jsonrpc_internal/src/error.rs @@ -0,0 +1,40 @@ +use thiserror::Error as ThisError; + +pub type Result = std::result::Result; + +/// Represents karyon's jsonrpc Error. +#[derive(ThisError, Debug)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error("Call Error: code: {0} msg: {1}")] + CallError(i32, String), + + #[error("RPC Method Error: code: {0} msg: {1}")] + RPCMethodError(i32, &'static str), + + #[error("Invalid Params: {0}")] + InvalidParams(&'static str), + + #[error("Invalid Request: {0}")] + InvalidRequest(&'static str), + + #[error(transparent)] + ParseJSON(#[from] serde_json::Error), + + #[error("Invalid Message Error: {0}")] + InvalidMsg(&'static str), + + #[error("Unsupported protocol: {0}")] + UnsupportedProtocol(String), + + #[error("Unexpected Error: {0}")] + General(&'static str), + + #[error(transparent)] + KaryonCore(#[from] karyon_core::error::Error), + + #[error(transparent)] + KaryonNet(#[from] karyon_net::Error), +} diff --git a/jsonrpc/jsonrpc_internal/src/lib.rs b/jsonrpc/jsonrpc_internal/src/lib.rs new file mode 100644 index 0000000..95af82a --- /dev/null +++ b/jsonrpc/jsonrpc_internal/src/lib.rs @@ -0,0 +1,65 @@ +mod error; +use std::{future::Future, pin::Pin}; + +pub use error::{Error, Result}; + +/// Represents the RPC method +pub type RPCMethod<'a> = Box RPCMethodOutput<'a> + Send + 'a>; +type RPCMethodOutput<'a> = + Pin> + Send + Sync + 'a>>; + +/// Defines the interface for an RPC service. +pub trait RPCService: Sync + Send { + fn get_method<'a>(&'a self, name: &'a str) -> Option; + fn name(&self) -> String; +} + +/// Implements the [`RPCService`] trait for a provided type. +/// +/// # Example +/// +/// ``` +/// use serde_json::Value; +/// +/// use karyon_jsonrpc_internal::{Error, impl_rpc_service}; +/// +/// struct Hello {} +/// +/// impl Hello { +/// async fn foo(&self, params: Value) -> Result { +/// Ok(serde_json::json!("foo!")) +/// } +/// +/// async fn bar(&self, params: Value) -> Result { +/// Ok(serde_json::json!("bar!")) +/// } +/// } +/// +/// impl_rpc_service!(Hello, foo, bar); +/// +/// ``` +#[macro_export] +macro_rules! impl_rpc_service { + ($t:ty, $($m:ident),*) => { + impl karyon_jsonrpc_internal::RPCService for $t { + fn get_method<'a>( + &'a self, + name: &'a str + ) -> Option { + match name { + $( + stringify!($m) => { + Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) + } + )* + _ => None, + } + + + } + fn name(&self) -> String{ + stringify!($t).to_string() + } + } + }; +} diff --git a/jsonrpc/jsonrpc_macro/Cargo.toml b/jsonrpc/jsonrpc_macro/Cargo.toml new file mode 100644 index 0000000..17140c5 --- /dev/null +++ b/jsonrpc/jsonrpc_macro/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "karyon_jsonrpc_macro" +version.workspace = true +edition.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +proc-macro = true + +[features] +default = ["smol"] +smol = ["karyon_jsonrpc_internal/smol"] +tokio = ["karyon_jsonrpc_internal/tokio"] + +[dependencies] +karyon_jsonrpc_internal = { path = "../jsonrpc_internal", default-features = false } + +proc-macro2 = "1.0" +quote = "1.0" +syn = { version = "1.0", features = ["full"] } + +serde_json = "1.0.114" + diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs new file mode 100644 index 0000000..f2015d4 --- /dev/null +++ b/jsonrpc/jsonrpc_macro/src/lib.rs @@ -0,0 +1,47 @@ +use proc_macro::TokenStream; +use proc_macro2::{Ident, TokenStream as TokenStream2}; +use quote::quote; +use syn::{parse_macro_input, spanned::Spanned, ImplItem, ItemImpl, Type}; + +macro_rules! err { + ($($tt:tt)*) => { + return syn::Error::new($($tt)*).to_compile_error().into() + }; +} + +#[proc_macro_attribute] +pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let mut methods: Vec = vec![]; + + let item2 = item.clone(); + let parsed_input = parse_macro_input!(item2 as ItemImpl); + + let self_ty = match *parsed_input.self_ty { + Type::Path(p) => p, + _ => err!( + parsed_input.span(), + "implementing the trait `RPCService` on this type is unsupported" + ), + }; + + if parsed_input.items.is_empty() { + err!(self_ty.span(), "At least one method should be implemented"); + } + + for item in parsed_input.items { + match item { + ImplItem::Method(method) => { + methods.push(method.sig.ident); + } + _ => err!(item.span(), "unexpected item"), + } + } + + let item2: TokenStream2 = item.into(); + let quoted = quote! { + karyon_jsonrpc_internal::impl_rpc_service!(#self_ty, #(#methods),*); + #item2 + }; + + quoted.into() +} diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index efbaf50..50d772b 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -1,37 +1,32 @@ +use std::time::Duration; + use log::debug; use serde::{de::DeserializeOwned, Serialize}; -use karyon_core::util::random_32; -use karyon_net::ToConn; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use crate::{ - codec::{Codec, CodecConfig}, - message, Error, Result, JSONRPC_VERSION, +use karyon_core::{async_util::timeout, util::random_32}; +use karyon_net::{ + tls::ClientTlsConfig, + ws::{ClientWsConfig, ClientWssConfig}, + Conn, Endpoint, ToEndpoint, }; -/// Represents client config -#[derive(Default)] -pub struct ClientConfig { - pub timeout: Option, -} +use crate::{ + codec::{JsonCodec, WsJsonCodec}, + message, Error, Result, +}; /// Represents an RPC client pub struct Client { - codec: Codec, - config: ClientConfig, + conn: Conn, + timeout: Option, } impl Client { - /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection. - pub fn new(conn: C, config: ClientConfig) -> Self { - let codec_config = CodecConfig { - max_allowed_buffer_size: 0, - ..Default::default() - }; - let codec = Codec::new(conn.to_conn(), codec_config); - Self { codec, config } - } - /// Calls the provided method, waits for the response, and returns the result. pub async fn call( &self, @@ -41,38 +36,122 @@ impl Client { let id = serde_json::json!(random_32()); let request = message::Request { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), id, method: method.to_string(), params: serde_json::json!(params), }; - let mut payload = serde_json::to_vec(&request)?; - payload.push(b'\n'); - self.codec.write_all(&payload).await?; + let req_json = serde_json::to_value(&request)?; + match self.timeout { + Some(s) => { + let dur = Duration::from_secs(s); + timeout(dur, self.conn.send(req_json)).await??; + } + None => { + self.conn.send(req_json).await?; + } + } debug!("--> {request}"); - let mut buffer = vec![]; - if let Some(t) = self.config.timeout { - self.codec.read_until_with_timeout(&mut buffer, t).await?; - } else { - self.codec.read_until(&mut buffer).await?; - }; - - let response = serde_json::from_slice::(&buffer)?; + let msg = self.conn.recv().await?; + let response = serde_json::from_value::(msg)?; debug!("<-- {response}"); - if let Some(error) = response.error { - return Err(Error::CallError(error.code, error.message)); - } - if response.id.is_none() || response.id.unwrap() != request.id { return Err(Error::InvalidMsg("Invalid response id")); } + if let Some(error) = response.error { + return Err(Error::CallError(error.code, error.message)); + } + match response.result { Some(result) => Ok(serde_json::from_value::(result)?), None => Err(Error::InvalidMsg("Invalid response result")), } } } + +pub struct ClientBuilder { + endpoint: Endpoint, + tls_config: Option<(rustls::ClientConfig, String)>, + timeout: Option, +} + +impl ClientBuilder { + pub fn with_timeout(mut self, timeout: u64) -> Self { + self.timeout = Some(timeout); + self + } + + pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some((config, dns_name.to_string())); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result { + let conn: Conn = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.endpoint, + ClientTlsConfig { + dns_name, + client_config: conf, + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &self.endpoint, + ClientWsConfig { + tcp_config: Default::default(), + wss_config: Some(ClientWssConfig { + dns_name, + client_config: conf, + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new( + karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + Ok(Client { + timeout: self.timeout, + conn, + }) + } +} +impl Client { + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ClientBuilder { + endpoint, + timeout: None, + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index 4a70412..74415c7 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,100 +1,73 @@ -use memchr::memchr; +use async_tungstenite::tungstenite::Message; -use karyon_core::async_util::timeout; -use karyon_net::Conn; +use karyon_net::{ + codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder}, + Error, Result, +}; -use crate::{Error, Result}; - -const DEFAULT_BUFFER_SIZE: usize = 1024; -const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB - -// TODO: Add unit tests for Codec's functions. - -/// Represents Codec config #[derive(Clone)] -pub struct CodecConfig { - pub default_buffer_size: usize, - /// The maximum allowed buffer size to receive a message. If set to zero, - /// there will be no size limit. - pub max_allowed_buffer_size: usize, -} - -impl Default for CodecConfig { - fn default() -> Self { - Self { - default_buffer_size: DEFAULT_BUFFER_SIZE, - max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE, - } - } -} +pub struct JsonCodec {} -pub struct Codec { - conn: Conn, - config: CodecConfig, +impl Codec for JsonCodec { + type Item = serde_json::Value; } -impl Codec { - /// Creates a new Codec - pub fn new(conn: Conn, config: CodecConfig) -> Self { - Self { conn, config } +impl Encoder for JsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + let buf = msg.as_bytes(); + dst[..buf.len()].copy_from_slice(buf); + Ok(buf.len()) } +} - /// Read all bytes into `buffer` until the `0x0A` byte or EOF is - /// reached. - /// - /// If successful, this function will return the total number of bytes read. - pub async fn read_until(&self, buffer: &mut Vec) -> Result { - let delim = b'\n'; - - let mut read = 0; - - loop { - let mut tmp_buf = vec![0; self.config.default_buffer_size]; - let n = self.conn.read(&mut tmp_buf).await?; - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - - match memchr(delim, &tmp_buf) { - Some(i) => { - buffer.extend_from_slice(&tmp_buf[..=i]); - read += i + 1; - break; - } - None => { - buffer.extend_from_slice(&tmp_buf); - read += tmp_buf.len(); - } - } +impl Decoder for JsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &mut [u8]) -> Result> { + let de = serde_json::Deserializer::from_slice(src); + let mut iter = de.into_iter::(); - if self.config.max_allowed_buffer_size != 0 - && buffer.len() == self.config.max_allowed_buffer_size - { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size", - )); - } - } + let item = match iter.next() { + Some(Ok(item)) => item, + Some(Err(ref e)) if e.is_eof() => return Ok(None), + Some(Err(e)) => return Err(Error::Encode(e.to_string())), + None => return Ok(None), + }; - Ok(read) + Ok(Some((iter.byte_offset(), item))) } +} - /// Writes an entire buffer into the given connection. - pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.write(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at(n); - buf = rest; - - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - } +#[derive(Clone)] +pub struct WsJsonCodec {} +impl WebSocketCodec for WsJsonCodec { + type Item = serde_json::Value; +} - Ok(()) +impl WebSocketEncoder for WsJsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem) -> Result { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + Ok(Message::Text(msg)) } +} - pub async fn read_until_with_timeout(&self, buffer: &mut Vec, t: u64) -> Result { - timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? +impl WebSocketDecoder for WsJsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &Message) -> Result { + match src { + Message::Text(s) => match serde_json::from_str(s) { + Ok(m) => Ok(m), + Err(err) => Err(Error::Decode(err.to_string())), + }, + _ => Err(Error::Decode("Receive wrong message".to_string())), + } } } diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs deleted file mode 100644 index 8bc8c49..0000000 --- a/jsonrpc/src/error.rs +++ /dev/null @@ -1,34 +0,0 @@ -use thiserror::Error as ThisError; - -pub type Result = std::result::Result; - -/// Represents karyon's jsonrpc Error. -#[derive(ThisError, Debug)] -pub enum Error { - #[error(transparent)] - IO(#[from] std::io::Error), - - #[error("Call Error: code: {0} msg: {1}")] - CallError(i32, String), - - #[error("RPC Method Error: code: {0} msg: {1}")] - RPCMethodError(i32, &'static str), - - #[error("Invalid Params: {0}")] - InvalidParams(&'static str), - - #[error("Invalid Request: {0}")] - InvalidRequest(&'static str), - - #[error(transparent)] - ParseJSON(#[from] serde_json::Error), - - #[error("Invalid Message Error: {0}")] - InvalidMsg(&'static str), - - #[error(transparent)] - KaryonCore(#[from] karyon_core::error::Error), - - #[error(transparent)] - KaryonNet(#[from] karyon_net::NetError), -} diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 3e0eb8f..1410a62 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,5 +1,12 @@ //! A fast and lightweight async implementation of [JSON-RPC -//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. +//! 2.0](https://www.jsonrpc.org/specification). +//! +//! features: +//! - Supports TCP, TLS, WebSocket, and Unix protocols. +//! - Uses smol(async-std) as the async runtime, but also supports tokio via +//! the `tokio` feature. +//! - Allows registration of multiple services (structs) of different types on a +//! single server. //! //! # Example //! @@ -9,69 +16,65 @@ //! use serde_json::Value; //! use smol::net::{TcpStream, TcpListener}; //! -//! use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; +//! use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; //! //! struct HelloWorld {} //! +//! #[rpc_impl] //! impl HelloWorld { -//! async fn say_hello(&self, params: Value) -> Result { +//! async fn say_hello(&self, params: Value) -> Result { //! let msg: String = serde_json::from_value(params)?; //! Ok(serde_json::json!(format!("Hello {msg}!"))) //! } //! -//! async fn foo(&self, params: Value) -> Result { +//! async fn foo(&self, params: Value) -> Result { //! Ok(serde_json::json!("foo!")) //! } //! -//! async fn bar(&self, params: Value) -> Result { +//! async fn bar(&self, params: Value) -> Result { //! Ok(serde_json::json!("bar!")) //! } //! } //! //! // Server //! async { -//! let ex = Arc::new(smol::Executor::new()); -//! //! // Creates a new server -//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); -//! let config = ServerConfig::default(); -//! let server = Server::new(listener, config, ex.clone()); -//! -//! // Register the HelloWorld service -//! register_service!(HelloWorld, say_hello, foo, bar); -//! server.attach_service(HelloWorld{}); +//! let server = Server::builder("tcp://127.0.0.1:60000") +//! .expect("create new server builder") +//! .service(HelloWorld{}) +//! .build() +//! .await +//! .expect("build the server"); //! //! // Starts the server -//! ex.run(server.start()); +//! server.start().await.expect("start the server"); //! }; //! //! // Client //! async { -//! //! // Creates a new client -//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); -//! let config = ClientConfig::default(); -//! let client = Client::new(conn, config); -//! -//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); +//! let client = Client::builder("tcp://127.0.0.1:60000") +//! .expect("create new client builder") +//! .build() +//! .await +//! .expect("build the client"); +//! +//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()) +//! .await +//! .expect("send a request"); //! }; //! //! ``` mod client; mod codec; -mod error; pub mod message; mod server; -mod service; -pub use client::{Client, ClientConfig}; -pub use codec::CodecConfig; -pub use error::Error as JsonRPCError; -pub use server::{Server, ServerConfig}; -pub use service::{RPCMethod, RPCService}; +pub use client::Client; +pub use server::Server; +pub use karyon_jsonrpc_internal::{impl_rpc_service, RPCMethod, RPCService}; +pub use karyon_jsonrpc_internal::{Error, Result}; +pub use karyon_jsonrpc_macro::rpc_impl; pub use karyon_net::Endpoint; - -const JSONRPC_VERSION: &str = "2.0"; -use error::{Error, Result}; diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 89ef613..f4bf490 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +pub const JSONRPC_VERSION: &str = "2.0"; + /// Parse error: Invalid JSON was received by the server. pub const PARSE_ERROR_CODE: i32 = -32700; diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 26d632a..1cc7e1f 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -1,17 +1,20 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, warn}; -use smol::lock::RwLock; -use karyon_core::async_util::{Executor, TaskGroup, TaskResult}; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use karyon_net::{Conn, Listener, ToListener}; +use karyon_core::async_runtime::Executor; +use karyon_core::async_util::{TaskGroup, TaskResult}; + +use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; use crate::{ - codec::{Codec, CodecConfig}, - message, - service::RPCService, - Endpoint, Error, Result, JSONRPC_VERSION, + codec::{JsonCodec, WsJsonCodec}, + message, Error, RPCService, Result, }; pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; @@ -27,69 +30,50 @@ fn pack_err_res(code: i32, msg: &str, id: Option) -> message: }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: Some(err), result: None, id, } } -/// RPC server config -#[derive(Default)] -pub struct ServerConfig { - codec_config: CodecConfig, -} - /// Represents an RPC server -pub struct Server<'a> { - listener: Listener, - services: RwLock>>, - task_group: TaskGroup<'a>, - config: ServerConfig, +pub struct Server { + listener: Listener, + task_group: TaskGroup, + services: HashMap>, } -impl<'a> Server<'a> { - /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls. - pub fn new(listener: T, config: ServerConfig, ex: Executor<'a>) -> Arc { - Arc::new(Self { - listener: listener.to_listener(), - services: RwLock::new(HashMap::new()), - task_group: TaskGroup::with_executor(ex), - config, - }) - } - +impl Server { /// Returns the local endpoint. pub fn local_endpoint(&self) -> Result { - self.listener.local_endpoint().map_err(Error::KaryonNet) + self.listener.local_endpoint().map_err(Error::from) } /// Starts the RPC server pub async fn start(self: Arc) -> Result<()> { loop { - let conn = self.listener.accept().await?; - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") + match self.listener.accept().await { + Ok(conn) => { + if let Err(err) = self.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } } } } - /// Attach a new service to the RPC server - pub async fn attach_service(&self, service: impl RPCService + 'a) { - self.services - .write() - .await - .insert(service.name(), Box::new(service)); - } - /// Shuts down the RPC server pub async fn shutdown(&self) { self.task_group.cancel().await; } /// Handles a new connection - async fn handle_conn(self: &Arc, conn: Conn) -> Result<()> { - let endpoint = conn.peer_endpoint()?; + async fn handle_conn(self: &Arc, conn: Conn) -> Result<()> { + let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); let on_failure = |result: TaskResult>| async move { @@ -100,19 +84,15 @@ impl<'a> Server<'a> { } }; - let codec = Codec::new(conn, self.config.codec_config.clone()); - let selfc = self.clone(); self.task_group.spawn( async move { loop { - let mut buffer = vec![]; - codec.read_until(&mut buffer).await?; - let response = selfc.handle_request(&buffer).await; - let mut payload = serde_json::to_vec(&response)?; - payload.push(b'\n'); - codec.write_all(&payload).await?; + let msg = conn.recv().await?; + let response = selfc.handle_request(msg).await; + let response = serde_json::to_value(response)?; debug!("--> {response}"); + conn.send(response).await?; } }, on_failure, @@ -122,14 +102,13 @@ impl<'a> Server<'a> { } /// Handles a request - async fn handle_request(&self, buffer: &[u8]) -> message::Response { - let rpc_msg = match serde_json::from_slice::(buffer) { + async fn handle_request(&self, msg: serde_json::Value) -> message::Response { + let rpc_msg = match serde_json::from_value::(msg) { Ok(m) => m, Err(_) => { return pack_err_res(message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG, None); } }; - debug!("<-- {rpc_msg}"); let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect(); @@ -144,9 +123,7 @@ impl<'a> Server<'a> { let srvc_name = srvc_method[0]; let method_name = srvc_method[1]; - let services = self.services.read().await; - - let service = match services.get(srvc_name) { + let service = match self.services.get(srvc_name) { Some(s) => s, None => { return pack_err_res( @@ -196,10 +173,108 @@ impl<'a> Server<'a> { }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, result: Some(result), id: Some(rpc_msg.id), } } } + +pub struct ServerBuilder { + endpoint: Endpoint, + tls_config: Option, + services: HashMap>, +} + +impl ServerBuilder { + pub fn service(mut self, service: impl RPCService + 'static) -> Self { + self.services.insert(service.name(), Box::new(service)); + self + } + + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result> { + self._build(TaskGroup::new()).await + } + + pub async fn build_with_executor(self, ex: Executor) -> Result> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result> { + let listener: Listener = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, Default::default(), JsonCodec {}) + .await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + karyon_net::ws::ServerWsConfig { + tcp_config: Default::default(), + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::listen(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + })) + } +} + +impl ServerBuilder {} + +impl Server { + pub fn builder(endpoint: impl ToEndpoint) -> Result { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/service.rs b/jsonrpc/src/service.rs deleted file mode 100644 index 23a50d9..0000000 --- a/jsonrpc/src/service.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use crate::Result; - -/// Represents the RPC method -pub type RPCMethod<'a> = Box RPCMethodOutput<'a> + Send + 'a>; -type RPCMethodOutput<'a> = - Pin> + Send + Sync + 'a>>; - -/// Defines the interface for an RPC service. -pub trait RPCService: Sync + Send { - fn get_method<'a>(&'a self, name: &'a str) -> Option; - fn name(&self) -> String; -} - -/// Implements the [`RPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{JsonRPCError, register_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// register_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! register_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { - fn get_method<'a>( - &'a self, - name: &'a str - ) -> Option { - match name { - $( - stringify!($m) => { - Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs new file mode 100644 index 0000000..e590ae1 --- /dev/null +++ b/jsonrpc/tests/impl_rpc_service.rs @@ -0,0 +1,27 @@ +use karyon_jsonrpc::{impl_rpc_service, Error, RPCService}; +use serde_json::Value; + +#[test] +fn service() { + struct Foo {} + + impl Foo { + async fn foo(&self, params: Value) -> Result { + Ok(params) + } + } + + impl_rpc_service!(Foo, foo); + + let f = Foo {}; + + assert!(f.get_method("foo").is_some()); + assert!(f.get_method("bar").is_none()); + + let params = serde_json::json!("params"); + + smol::block_on(async { + let foo_method = f.get_method("foo").unwrap(); + assert_eq!(foo_method(params.clone()).await.unwrap(), params); + }); +} diff --git a/jsonrpc/tests/rpc_impl.rs b/jsonrpc/tests/rpc_impl.rs new file mode 100644 index 0000000..5b14b59 --- /dev/null +++ b/jsonrpc/tests/rpc_impl.rs @@ -0,0 +1,26 @@ +use karyon_jsonrpc::{rpc_impl, Error, RPCService}; +use serde_json::Value; + +#[test] +fn rpc_impl_service() { + struct Foo {} + + #[rpc_impl] + impl Foo { + async fn foo(&self, params: Value) -> Result { + Ok(params) + } + } + + let f = Foo {}; + + assert!(f.get_method("foo").is_some()); + assert!(f.get_method("bar").is_none()); + + let params = serde_json::json!("params"); + + smol::block_on(async { + let foo_method = f.get_method("foo").unwrap(); + assert_eq!(foo_method(params.clone()).await.unwrap(), params); + }); +} -- cgit v1.2.3