aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-04-11 10:19:20 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-19 13:51:30 +0200
commit0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch)
tree961d73218af672797d49f899289bef295bc56493 /jsonrpc
parenta69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff)
add support for tokio & improve net crate api
Diffstat (limited to 'jsonrpc')
-rw-r--r--jsonrpc/Cargo.toml45
-rw-r--r--jsonrpc/README.md73
-rw-r--r--jsonrpc/examples/client.py10
-rw-r--r--jsonrpc/examples/client.rs11
-rw-r--r--jsonrpc/examples/server.rs34
-rw-r--r--jsonrpc/jsonrpc_internal/Cargo.toml18
-rw-r--r--jsonrpc/jsonrpc_internal/src/error.rs (renamed from jsonrpc/src/error.rs)8
-rw-r--r--jsonrpc/jsonrpc_internal/src/lib.rs (renamed from jsonrpc/src/service.rs)17
-rw-r--r--jsonrpc/jsonrpc_macro/Cargo.toml24
-rw-r--r--jsonrpc/jsonrpc_macro/src/lib.rs47
-rw-r--r--jsonrpc/src/client.rs155
-rw-r--r--jsonrpc/src/codec.rs139
-rw-r--r--jsonrpc/src/lib.rs65
-rw-r--r--jsonrpc/src/message.rs2
-rw-r--r--jsonrpc/src/server.rs193
-rw-r--r--jsonrpc/tests/impl_rpc_service.rs27
-rw-r--r--jsonrpc/tests/rpc_impl.rs26
17 files changed, 608 insertions, 286 deletions
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml
index 73ae275..e81ec10 100644
--- a/jsonrpc/Cargo.toml
+++ b/jsonrpc/Cargo.toml
@@ -1,28 +1,49 @@
[package]
name = "karyon_jsonrpc"
-version.workspace = true
+version.workspace = true
edition.workspace = true
-autoexamples = false
+
+[features]
+default = ["smol"]
+smol = [
+ "karyon_core/smol",
+ "karyon_net/smol",
+ "karyon_jsonrpc_internal/smol",
+ "karyon_jsonrpc_macro/smol",
+ "dep:futures-rustls",
+ "async-tungstenite/async-std-runtime",
+]
+tokio = [
+ "karyon_core/tokio",
+ "karyon_net/tokio",
+ "karyon_jsonrpc_internal/tokio",
+ "karyon_jsonrpc_macro/tokio",
+ "async-tungstenite/tokio-runtime",
+ "dep:tokio-rustls",
+]
[dependencies]
-karyon_core.workspace = true
-karyon_net.workspace = true
+karyon_core = { workspace = true, default-features = false }
+karyon_net = { workspace = true, default-features = false }
+
+karyon_jsonrpc_macro = { path = "jsonrpc_macro", default-features = false }
+karyon_jsonrpc_internal = { path = "jsonrpc_internal", default-features = false }
-smol = "2.0.0"
log = "0.4.21"
rand = "0.8.5"
+async-tungstenite = { version = "0.25.0", default-features = false }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
thiserror = "1.0.58"
memchr = "2.7.1"
+async-trait = "0.1.77"
-[[example]]
-name = "server"
-path = "examples/server.rs"
-
-[[example]]
-name = "client"
-path = "examples/client.rs"
+futures-rustls = { version = "0.25.1", optional = true }
+tokio-rustls = { version = "0.26.0", optional = true }
[dev-dependencies]
env_logger = "0.11.3"
+rcgen = "0.13.1"
+rustls-pemfile = "2.1.2"
+smol = "2.0.0"
+webpki-roots = "0.26.1"
diff --git a/jsonrpc/README.md b/jsonrpc/README.md
index af7dfe2..98c18e1 100644
--- a/jsonrpc/README.md
+++ b/jsonrpc/README.md
@@ -1,52 +1,73 @@
# karyon jsonrpc
A fast and lightweight async implementation of [JSON-RPC
-2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols.
+2.0](https://www.jsonrpc.org/specification).
+
+features:
+- Supports TCP, TLS, WebSocket, and Unix protocols.
+- Uses smol(async-std) as the async runtime, but also supports tokio via the
+ `tokio` feature.
+- Allows registration of multiple services (structs) of different types on a
+ single server.
## Example
-```rust
+```
use std::sync::Arc;
use serde_json::Value;
use smol::net::{TcpStream, TcpListener};
-use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig};
+use karyon_jsonrpc::{Error, Server, Client, rpc_impl};
struct HelloWorld {}
+#[rpc_impl]
impl HelloWorld {
- async fn say_hello(&self, params: Value) -> Result<Value, JsonRPCError> {
+ async fn say_hello(&self, params: Value) -> Result<Value, Error> {
let msg: String = serde_json::from_value(params)?;
Ok(serde_json::json!(format!("Hello {msg}!")))
}
-}
-let ex = Arc::new(smol::Executor::new());
+ async fn foo(&self, params: Value) -> Result<Value, Error> {
+ Ok(serde_json::json!("foo!"))
+ }
-//////////////////
-// Server
-//////////////////
-// Creates a new server
-let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
-let config = ServerConfig::default();
-let server = Server::new(listener, config, ex.clone());
+ async fn bar(&self, params: Value) -> Result<Value, Error> {
+ Ok(serde_json::json!("bar!"))
+ }
+}
-// Register the HelloWorld service
-register_service!(HelloWorld, say_hello);
-server.attach_service(HelloWorld{});
+// Server
+async {
+ // Creates a new server
+ let server = Server::builder("tcp://127.0.0.1:60000")
+ .expect("create new server builder")
+ .service(HelloWorld{})
+ .build()
+ .await
+ .expect("build the server");
-// Starts the server
-ex.run(server.start());
+ // Starts the server
+ server.start().await.expect("start the server");
+};
-//////////////////
-// Client
-//////////////////
-// Creates a new client
-let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
-let config = ClientConfig::default();
-let client = Client::new(conn, config);
+// Client
+async {
+ // Creates a new client
+ let client = Client::builder("tcp://127.0.0.1:60000")
+ .expect("create new client builder")
+ .build()
+ .await
+ .expect("build the client");
-let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap();
+ let result: String = client.call("HelloWorld.say_hello", "world".to_string())
+ .await
+ .expect("send a request");
+};
```
+
+
+
+
diff --git a/jsonrpc/examples/client.py b/jsonrpc/examples/client.py
index 2066e82..745d5db 100644
--- a/jsonrpc/examples/client.py
+++ b/jsonrpc/examples/client.py
@@ -3,7 +3,7 @@ import random
import json
HOST = "127.0.0.1"
-PORT = 60000
+PORT = 6000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
@@ -15,7 +15,7 @@ req = {
"params": {"x": 4, "y": 3},
}
print("Send: ", req)
-s.sendall((json.dumps(req) + '\n').encode())
+s.sendall((json.dumps(req)).encode())
res = s.recv(1024)
res = json.loads(res)
print("Received: ", res)
@@ -27,7 +27,7 @@ req = {
"params": {"x": 4, "y": 3},
}
print("Send: ", req)
-s.sendall((json.dumps(req) + '\n').encode())
+s.sendall((json.dumps(req)).encode())
res = s.recv(1024)
res = json.loads(res)
print("Received: ", res)
@@ -39,7 +39,7 @@ req = {
"params": None,
}
print("Send: ", req)
-s.sendall((json.dumps(req) + '\n').encode())
+s.sendall((json.dumps(req)).encode())
res = s.recv(1024)
res = json.loads(res)
print("Received: ", res)
@@ -51,7 +51,7 @@ req = {
"params": None,
}
print("Send: ", req)
-s.sendall((json.dumps(req) + '\n').encode())
+s.sendall((json.dumps(req)).encode())
res = s.recv(1024)
res = json.loads(res)
print("Received: ", res)
diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs
index 2c8cf83..3289772 100644
--- a/jsonrpc/examples/client.rs
+++ b/jsonrpc/examples/client.rs
@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};
-use smol::net::TcpStream;
-use karyon_jsonrpc::{Client, ClientConfig};
+use karyon_jsonrpc::Client;
#[derive(Deserialize, Serialize)]
struct Req {
@@ -15,9 +14,11 @@ struct Pong {}
fn main() {
env_logger::init();
smol::future::block_on(async {
- let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
- let config = ClientConfig::default();
- let client = Client::new(conn, config);
+ let client = Client::builder("tcp://127.0.0.1:6000")
+ .expect("Create client builder")
+ .build()
+ .await
+ .unwrap();
let params = Req { x: 10, y: 7 };
let result: u32 = client.call("Calc.add", params).await.unwrap();
diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs
index 6953433..841e276 100644
--- a/jsonrpc/examples/server.rs
+++ b/jsonrpc/examples/server.rs
@@ -1,10 +1,7 @@
-use std::sync::Arc;
-
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use smol::net::TcpListener;
-use karyon_jsonrpc::{register_service, JsonRPCError, Server, ServerConfig};
+use karyon_jsonrpc::{rpc_impl, Error, Server};
struct Calc {
version: String,
@@ -19,43 +16,44 @@ struct Req {
#[derive(Deserialize, Serialize)]
struct Pong {}
+#[rpc_impl]
impl Calc {
- async fn ping(&self, _params: Value) -> Result<Value, JsonRPCError> {
+ async fn ping(&self, _params: Value) -> Result<Value, Error> {
Ok(serde_json::json!(Pong {}))
}
- async fn add(&self, params: Value) -> Result<Value, JsonRPCError> {
+ async fn add(&self, params: Value) -> Result<Value, Error> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x + params.y))
}
- async fn sub(&self, params: Value) -> Result<Value, JsonRPCError> {
+ async fn sub(&self, params: Value) -> Result<Value, Error> {
let params: Req = serde_json::from_value(params)?;
Ok(serde_json::json!(params.x - params.y))
}
- async fn version(&self, _params: Value) -> Result<Value, JsonRPCError> {
+ async fn version(&self, _params: Value) -> Result<Value, Error> {
Ok(serde_json::json!(self.version))
}
}
fn main() {
env_logger::init();
- let ex = Arc::new(smol::Executor::new());
- smol::block_on(ex.clone().run(async {
- // Creates a new server
- let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
- let config = ServerConfig::default();
- let server = Server::new(listener, config, ex);
-
+ smol::block_on(async {
// Register the Calc service
- register_service!(Calc, ping, add, sub, version);
let calc = Calc {
version: String::from("0.1"),
};
- server.attach_service(calc).await;
+
+ // Creates a new server
+ let server = Server::builder("tcp://127.0.0.1:6000")
+ .expect("Create a new server builder")
+ .service(calc)
+ .build()
+ .await
+ .expect("start a new server");
// Start the server
server.start().await.unwrap();
- }));
+ });
}
diff --git a/jsonrpc/jsonrpc_internal/Cargo.toml b/jsonrpc/jsonrpc_internal/Cargo.toml
new file mode 100644
index 0000000..5a3acc4
--- /dev/null
+++ b/jsonrpc/jsonrpc_internal/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "karyon_jsonrpc_internal"
+version.workspace = true
+edition.workspace = true
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[features]
+default = ["smol"]
+smol = ["karyon_core/smol", "karyon_net/smol"]
+tokio = ["karyon_core/tokio", "karyon_net/tokio"]
+
+[dependencies]
+karyon_core = { workspace = true, default-features = false }
+karyon_net = { workspace = true, default-features = false }
+
+serde_json = "1.0.114"
+thiserror = "1.0.58"
diff --git a/jsonrpc/src/error.rs b/jsonrpc/jsonrpc_internal/src/error.rs
index 8bc8c49..7f89729 100644
--- a/jsonrpc/src/error.rs
+++ b/jsonrpc/jsonrpc_internal/src/error.rs
@@ -26,9 +26,15 @@ pub enum Error {
#[error("Invalid Message Error: {0}")]
InvalidMsg(&'static str),
+ #[error("Unsupported protocol: {0}")]
+ UnsupportedProtocol(String),
+
+ #[error("Unexpected Error: {0}")]
+ General(&'static str),
+
#[error(transparent)]
KaryonCore(#[from] karyon_core::error::Error),
#[error(transparent)]
- KaryonNet(#[from] karyon_net::NetError),
+ KaryonNet(#[from] karyon_net::Error),
}
diff --git a/jsonrpc/src/service.rs b/jsonrpc/jsonrpc_internal/src/lib.rs
index 23a50d9..95af82a 100644
--- a/jsonrpc/src/service.rs
+++ b/jsonrpc/jsonrpc_internal/src/lib.rs
@@ -1,6 +1,7 @@
+mod error;
use std::{future::Future, pin::Pin};
-use crate::Result;
+pub use error::{Error, Result};
/// Represents the RPC method
pub type RPCMethod<'a> = Box<dyn Fn(serde_json::Value) -> RPCMethodOutput<'a> + Send + 'a>;
@@ -20,31 +21,31 @@ pub trait RPCService: Sync + Send {
/// ```
/// use serde_json::Value;
///
-/// use karyon_jsonrpc::{JsonRPCError, register_service};
+/// use karyon_jsonrpc_internal::{Error, impl_rpc_service};
///
/// struct Hello {}
///
/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> {
+/// async fn foo(&self, params: Value) -> Result<Value, Error> {
/// Ok(serde_json::json!("foo!"))
/// }
///
-/// async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> {
+/// async fn bar(&self, params: Value) -> Result<Value, Error> {
/// Ok(serde_json::json!("bar!"))
/// }
/// }
///
-/// register_service!(Hello, foo, bar);
+/// impl_rpc_service!(Hello, foo, bar);
///
/// ```
#[macro_export]
-macro_rules! register_service {
+macro_rules! impl_rpc_service {
($t:ty, $($m:ident),*) => {
- impl karyon_jsonrpc::RPCService for $t {
+ impl karyon_jsonrpc_internal::RPCService for $t {
fn get_method<'a>(
&'a self,
name: &'a str
- ) -> Option<karyon_jsonrpc::RPCMethod> {
+ ) -> Option<karyon_jsonrpc_internal::RPCMethod> {
match name {
$(
stringify!($m) => {
diff --git a/jsonrpc/jsonrpc_macro/Cargo.toml b/jsonrpc/jsonrpc_macro/Cargo.toml
new file mode 100644
index 0000000..17140c5
--- /dev/null
+++ b/jsonrpc/jsonrpc_macro/Cargo.toml
@@ -0,0 +1,24 @@
+[package]
+name = "karyon_jsonrpc_macro"
+version.workspace = true
+edition.workspace = true
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[lib]
+proc-macro = true
+
+[features]
+default = ["smol"]
+smol = ["karyon_jsonrpc_internal/smol"]
+tokio = ["karyon_jsonrpc_internal/tokio"]
+
+[dependencies]
+karyon_jsonrpc_internal = { path = "../jsonrpc_internal", default-features = false }
+
+proc-macro2 = "1.0"
+quote = "1.0"
+syn = { version = "1.0", features = ["full"] }
+
+serde_json = "1.0.114"
+
diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs
new file mode 100644
index 0000000..f2015d4
--- /dev/null
+++ b/jsonrpc/jsonrpc_macro/src/lib.rs
@@ -0,0 +1,47 @@
+use proc_macro::TokenStream;
+use proc_macro2::{Ident, TokenStream as TokenStream2};
+use quote::quote;
+use syn::{parse_macro_input, spanned::Spanned, ImplItem, ItemImpl, Type};
+
+macro_rules! err {
+ ($($tt:tt)*) => {
+ return syn::Error::new($($tt)*).to_compile_error().into()
+ };
+}
+
+#[proc_macro_attribute]
+pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let mut methods: Vec<Ident> = vec![];
+
+ let item2 = item.clone();
+ let parsed_input = parse_macro_input!(item2 as ItemImpl);
+
+ let self_ty = match *parsed_input.self_ty {
+ Type::Path(p) => p,
+ _ => err!(
+ parsed_input.span(),
+ "implementing the trait `RPCService` on this type is unsupported"
+ ),
+ };
+
+ if parsed_input.items.is_empty() {
+ err!(self_ty.span(), "At least one method should be implemented");
+ }
+
+ for item in parsed_input.items {
+ match item {
+ ImplItem::Method(method) => {
+ methods.push(method.sig.ident);
+ }
+ _ => err!(item.span(), "unexpected item"),
+ }
+ }
+
+ let item2: TokenStream2 = item.into();
+ let quoted = quote! {
+ karyon_jsonrpc_internal::impl_rpc_service!(#self_ty, #(#methods),*);
+ #item2
+ };
+
+ quoted.into()
+}
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
index efbaf50..50d772b 100644
--- a/jsonrpc/src/client.rs
+++ b/jsonrpc/src/client.rs
@@ -1,37 +1,32 @@
+use std::time::Duration;
+
use log::debug;
use serde::{de::DeserializeOwned, Serialize};
-use karyon_core::util::random_32;
-use karyon_net::ToConn;
+#[cfg(feature = "smol")]
+use futures_rustls::rustls;
+#[cfg(feature = "tokio")]
+use tokio_rustls::rustls;
-use crate::{
- codec::{Codec, CodecConfig},
- message, Error, Result, JSONRPC_VERSION,
+use karyon_core::{async_util::timeout, util::random_32};
+use karyon_net::{
+ tls::ClientTlsConfig,
+ ws::{ClientWsConfig, ClientWssConfig},
+ Conn, Endpoint, ToEndpoint,
};
-/// Represents client config
-#[derive(Default)]
-pub struct ClientConfig {
- pub timeout: Option<u64>,
-}
+use crate::{
+ codec::{JsonCodec, WsJsonCodec},
+ message, Error, Result,
+};
/// Represents an RPC client
pub struct Client {
- codec: Codec,
- config: ClientConfig,
+ conn: Conn<serde_json::Value>,
+ timeout: Option<u64>,
}
impl Client {
- /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection.
- pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self {
- let codec_config = CodecConfig {
- max_allowed_buffer_size: 0,
- ..Default::default()
- };
- let codec = Codec::new(conn.to_conn(), codec_config);
- Self { codec, config }
- }
-
/// Calls the provided method, waits for the response, and returns the result.
pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
&self,
@@ -41,38 +36,122 @@ impl Client {
let id = serde_json::json!(random_32());
let request = message::Request {
- jsonrpc: JSONRPC_VERSION.to_string(),
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
id,
method: method.to_string(),
params: serde_json::json!(params),
};
- let mut payload = serde_json::to_vec(&request)?;
- payload.push(b'\n');
- self.codec.write_all(&payload).await?;
+ let req_json = serde_json::to_value(&request)?;
+ match self.timeout {
+ Some(s) => {
+ let dur = Duration::from_secs(s);
+ timeout(dur, self.conn.send(req_json)).await??;
+ }
+ None => {
+ self.conn.send(req_json).await?;
+ }
+ }
debug!("--> {request}");
- let mut buffer = vec![];
- if let Some(t) = self.config.timeout {
- self.codec.read_until_with_timeout(&mut buffer, t).await?;
- } else {
- self.codec.read_until(&mut buffer).await?;
- };
-
- let response = serde_json::from_slice::<message::Response>(&buffer)?;
+ let msg = self.conn.recv().await?;
+ let response = serde_json::from_value::<message::Response>(msg)?;
debug!("<-- {response}");
- if let Some(error) = response.error {
- return Err(Error::CallError(error.code, error.message));
- }
-
if response.id.is_none() || response.id.unwrap() != request.id {
return Err(Error::InvalidMsg("Invalid response id"));
}
+ if let Some(error) = response.error {
+ return Err(Error::CallError(error.code, error.message));
+ }
+
match response.result {
Some(result) => Ok(serde_json::from_value::<V>(result)?),
None => Err(Error::InvalidMsg("Invalid response result")),
}
}
}
+
+pub struct ClientBuilder {
+ endpoint: Endpoint,
+ tls_config: Option<(rustls::ClientConfig, String)>,
+ timeout: Option<u64>,
+}
+
+impl ClientBuilder {
+ pub fn with_timeout(mut self, timeout: u64) -> Self {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tls_config = Some((config, dns_name.to_string()));
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ pub async fn build(self) -> Result<Client> {
+ let conn: Conn<serde_json::Value> = match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::tls::dial(
+ &self.endpoint,
+ ClientTlsConfig {
+ dns_name,
+ client_config: conf,
+ tcp_config: Default::default(),
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
+ ),
+ },
+ Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config {
+ Some((conf, dns_name)) => Box::new(
+ karyon_net::ws::dial(
+ &self.endpoint,
+ ClientWsConfig {
+ tcp_config: Default::default(),
+ wss_config: Some(ClientWssConfig {
+ dns_name,
+ client_config: conf,
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {})
+ .await?,
+ ),
+ },
+ Endpoint::Unix(..) => Box::new(
+ karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
+ ),
+ _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ };
+ Ok(Client {
+ timeout: self.timeout,
+ conn,
+ })
+ }
+}
+impl Client {
+ pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
+ let endpoint = endpoint.to_endpoint()?;
+ Ok(ClientBuilder {
+ endpoint,
+ timeout: None,
+ tls_config: None,
+ })
+ }
+}
diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs
index 4a70412..74415c7 100644
--- a/jsonrpc/src/codec.rs
+++ b/jsonrpc/src/codec.rs
@@ -1,100 +1,73 @@
-use memchr::memchr;
+use async_tungstenite::tungstenite::Message;
-use karyon_core::async_util::timeout;
-use karyon_net::Conn;
+use karyon_net::{
+ codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder},
+ Error, Result,
+};
-use crate::{Error, Result};
-
-const DEFAULT_BUFFER_SIZE: usize = 1024;
-const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB
-
-// TODO: Add unit tests for Codec's functions.
-
-/// Represents Codec config
#[derive(Clone)]
-pub struct CodecConfig {
- pub default_buffer_size: usize,
- /// The maximum allowed buffer size to receive a message. If set to zero,
- /// there will be no size limit.
- pub max_allowed_buffer_size: usize,
-}
-
-impl Default for CodecConfig {
- fn default() -> Self {
- Self {
- default_buffer_size: DEFAULT_BUFFER_SIZE,
- max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE,
- }
- }
-}
+pub struct JsonCodec {}
-pub struct Codec {
- conn: Conn,
- config: CodecConfig,
+impl Codec for JsonCodec {
+ type Item = serde_json::Value;
}
-impl Codec {
- /// Creates a new Codec
- pub fn new(conn: Conn, config: CodecConfig) -> Self {
- Self { conn, config }
+impl Encoder for JsonCodec {
+ type EnItem = serde_json::Value;
+ fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> {
+ let msg = match serde_json::to_string(src) {
+ Ok(m) => m,
+ Err(err) => return Err(Error::Encode(err.to_string())),
+ };
+ let buf = msg.as_bytes();
+ dst[..buf.len()].copy_from_slice(buf);
+ Ok(buf.len())
}
+}
- /// Read all bytes into `buffer` until the `0x0A` byte or EOF is
- /// reached.
- ///
- /// If successful, this function will return the total number of bytes read.
- pub async fn read_until(&self, buffer: &mut Vec<u8>) -> Result<usize> {
- let delim = b'\n';
-
- let mut read = 0;
-
- loop {
- let mut tmp_buf = vec![0; self.config.default_buffer_size];
- let n = self.conn.read(&mut tmp_buf).await?;
- if n == 0 {
- return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into()));
- }
-
- match memchr(delim, &tmp_buf) {
- Some(i) => {
- buffer.extend_from_slice(&tmp_buf[..=i]);
- read += i + 1;
- break;
- }
- None => {
- buffer.extend_from_slice(&tmp_buf);
- read += tmp_buf.len();
- }
- }
+impl Decoder for JsonCodec {
+ type DeItem = serde_json::Value;
+ fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> {
+ let de = serde_json::Deserializer::from_slice(src);
+ let mut iter = de.into_iter::<serde_json::Value>();
- if self.config.max_allowed_buffer_size != 0
- && buffer.len() == self.config.max_allowed_buffer_size
- {
- return Err(Error::InvalidMsg(
- "Message exceeds the maximum allowed size",
- ));
- }
- }
+ let item = match iter.next() {
+ Some(Ok(item)) => item,
+ Some(Err(ref e)) if e.is_eof() => return Ok(None),
+ Some(Err(e)) => return Err(Error::Encode(e.to_string())),
+ None => return Ok(None),
+ };
- Ok(read)
+ Ok(Some((iter.byte_offset(), item)))
}
+}
- /// Writes an entire buffer into the given connection.
- pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> {
- while !buf.is_empty() {
- let n = self.conn.write(buf).await?;
- let (_, rest) = std::mem::take(&mut buf).split_at(n);
- buf = rest;
-
- if n == 0 {
- return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into()));
- }
- }
+#[derive(Clone)]
+pub struct WsJsonCodec {}
+impl WebSocketCodec for WsJsonCodec {
+ type Item = serde_json::Value;
+}
- Ok(())
+impl WebSocketEncoder for WsJsonCodec {
+ type EnItem = serde_json::Value;
+ fn encode(&self, src: &Self::EnItem) -> Result<Message> {
+ let msg = match serde_json::to_string(src) {
+ Ok(m) => m,
+ Err(err) => return Err(Error::Encode(err.to_string())),
+ };
+ Ok(Message::Text(msg))
}
+}
- pub async fn read_until_with_timeout(&self, buffer: &mut Vec<u8>, t: u64) -> Result<usize> {
- timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await?
+impl WebSocketDecoder for WsJsonCodec {
+ type DeItem = serde_json::Value;
+ fn decode(&self, src: &Message) -> Result<Self::DeItem> {
+ match src {
+ Message::Text(s) => match serde_json::from_str(s) {
+ Ok(m) => Ok(m),
+ Err(err) => Err(Error::Decode(err.to_string())),
+ },
+ _ => Err(Error::Decode("Receive wrong message".to_string())),
+ }
}
}
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index 3e0eb8f..1410a62 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -1,5 +1,12 @@
//! A fast and lightweight async implementation of [JSON-RPC
-//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols.
+//! 2.0](https://www.jsonrpc.org/specification).
+//!
+//! features:
+//! - Supports TCP, TLS, WebSocket, and Unix protocols.
+//! - Uses smol(async-std) as the async runtime, but also supports tokio via
+//! the `tokio` feature.
+//! - Allows registration of multiple services (structs) of different types on a
+//! single server.
//!
//! # Example
//!
@@ -9,69 +16,65 @@
//! use serde_json::Value;
//! use smol::net::{TcpStream, TcpListener};
//!
-//! use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig};
+//! use karyon_jsonrpc::{Error, Server, Client, rpc_impl};
//!
//! struct HelloWorld {}
//!
+//! #[rpc_impl]
//! impl HelloWorld {
-//! async fn say_hello(&self, params: Value) -> Result<Value, JsonRPCError> {
+//! async fn say_hello(&self, params: Value) -> Result<Value, Error> {
//! let msg: String = serde_json::from_value(params)?;
//! Ok(serde_json::json!(format!("Hello {msg}!")))
//! }
//!
-//! async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> {
+//! async fn foo(&self, params: Value) -> Result<Value, Error> {
//! Ok(serde_json::json!("foo!"))
//! }
//!
-//! async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> {
+//! async fn bar(&self, params: Value) -> Result<Value, Error> {
//! Ok(serde_json::json!("bar!"))
//! }
//! }
//!
//! // Server
//! async {
-//! let ex = Arc::new(smol::Executor::new());
-//!
//! // Creates a new server
-//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
-//! let config = ServerConfig::default();
-//! let server = Server::new(listener, config, ex.clone());
-//!
-//! // Register the HelloWorld service
-//! register_service!(HelloWorld, say_hello, foo, bar);
-//! server.attach_service(HelloWorld{});
+//! let server = Server::builder("tcp://127.0.0.1:60000")
+//! .expect("create new server builder")
+//! .service(HelloWorld{})
+//! .build()
+//! .await
+//! .expect("build the server");
//!
//! // Starts the server
-//! ex.run(server.start());
+//! server.start().await.expect("start the server");
//! };
//!
//! // Client
//! async {
-//!
//! // Creates a new client
-//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
-//! let config = ClientConfig::default();
-//! let client = Client::new(conn, config);
-//!
-//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap();
+//! let client = Client::builder("tcp://127.0.0.1:60000")
+//! .expect("create new client builder")
+//! .build()
+//! .await
+//! .expect("build the client");
+//!
+//! let result: String = client.call("HelloWorld.say_hello", "world".to_string())
+//! .await
+//! .expect("send a request");
//! };
//!
//! ```
mod client;
mod codec;
-mod error;
pub mod message;
mod server;
-mod service;
-pub use client::{Client, ClientConfig};
-pub use codec::CodecConfig;
-pub use error::Error as JsonRPCError;
-pub use server::{Server, ServerConfig};
-pub use service::{RPCMethod, RPCService};
+pub use client::Client;
+pub use server::Server;
+pub use karyon_jsonrpc_internal::{impl_rpc_service, RPCMethod, RPCService};
+pub use karyon_jsonrpc_internal::{Error, Result};
+pub use karyon_jsonrpc_macro::rpc_impl;
pub use karyon_net::Endpoint;
-
-const JSONRPC_VERSION: &str = "2.0";
-use error::{Error, Result};
diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs
index 89ef613..f4bf490 100644
--- a/jsonrpc/src/message.rs
+++ b/jsonrpc/src/message.rs
@@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
+pub const JSONRPC_VERSION: &str = "2.0";
+
/// Parse error: Invalid JSON was received by the server.
pub const PARSE_ERROR_CODE: i32 = -32700;
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs
index 26d632a..1cc7e1f 100644
--- a/jsonrpc/src/server.rs
+++ b/jsonrpc/src/server.rs
@@ -1,17 +1,20 @@
use std::{collections::HashMap, sync::Arc};
use log::{debug, error, warn};
-use smol::lock::RwLock;
-use karyon_core::async_util::{Executor, TaskGroup, TaskResult};
+#[cfg(feature = "smol")]
+use futures_rustls::rustls;
+#[cfg(feature = "tokio")]
+use tokio_rustls::rustls;
-use karyon_net::{Conn, Listener, ToListener};
+use karyon_core::async_runtime::Executor;
+use karyon_core::async_util::{TaskGroup, TaskResult};
+
+use karyon_net::{Conn, Endpoint, Listener, ToEndpoint};
use crate::{
- codec::{Codec, CodecConfig},
- message,
- service::RPCService,
- Endpoint, Error, Result, JSONRPC_VERSION,
+ codec::{JsonCodec, WsJsonCodec},
+ message, Error, RPCService, Result,
};
pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request";
@@ -27,69 +30,50 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message:
};
message::Response {
- jsonrpc: JSONRPC_VERSION.to_string(),
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
error: Some(err),
result: None,
id,
}
}
-/// RPC server config
-#[derive(Default)]
-pub struct ServerConfig {
- codec_config: CodecConfig,
-}
-
/// Represents an RPC server
-pub struct Server<'a> {
- listener: Listener,
- services: RwLock<HashMap<String, Box<dyn RPCService + 'a>>>,
- task_group: TaskGroup<'a>,
- config: ServerConfig,
+pub struct Server {
+ listener: Listener<serde_json::Value>,
+ task_group: TaskGroup,
+ services: HashMap<String, Box<dyn RPCService + 'static>>,
}
-impl<'a> Server<'a> {
- /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls.
- pub fn new<T: ToListener>(listener: T, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> {
- Arc::new(Self {
- listener: listener.to_listener(),
- services: RwLock::new(HashMap::new()),
- task_group: TaskGroup::with_executor(ex),
- config,
- })
- }
-
+impl Server {
/// Returns the local endpoint.
pub fn local_endpoint(&self) -> Result<Endpoint> {
- self.listener.local_endpoint().map_err(Error::KaryonNet)
+ self.listener.local_endpoint().map_err(Error::from)
}
/// Starts the RPC server
pub async fn start(self: Arc<Self>) -> Result<()> {
loop {
- let conn = self.listener.accept().await?;
- if let Err(err) = self.handle_conn(conn).await {
- error!("Failed to handle a new conn: {err}")
+ match self.listener.accept().await {
+ Ok(conn) => {
+ if let Err(err) = self.handle_conn(conn).await {
+ error!("Failed to handle a new conn: {err}")
+ }
+ }
+ Err(err) => {
+ error!("Failed to accept a new conn: {err}")
+ }
}
}
}
- /// Attach a new service to the RPC server
- pub async fn attach_service(&self, service: impl RPCService + 'a) {
- self.services
- .write()
- .await
- .insert(service.name(), Box::new(service));
- }
-
/// Shuts down the RPC server
pub async fn shutdown(&self) {
self.task_group.cancel().await;
}
/// Handles a new connection
- async fn handle_conn(self: &Arc<Self>, conn: Conn) -> Result<()> {
- let endpoint = conn.peer_endpoint()?;
+ async fn handle_conn(self: &Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> {
+ let endpoint = conn.peer_endpoint().expect("get peer endpoint");
debug!("Handle a new connection {endpoint}");
let on_failure = |result: TaskResult<Result<()>>| async move {
@@ -100,19 +84,15 @@ impl<'a> Server<'a> {
}
};
- let codec = Codec::new(conn, self.config.codec_config.clone());
-
let selfc = self.clone();
self.task_group.spawn(
async move {
loop {
- let mut buffer = vec![];
- codec.read_until(&mut buffer).await?;
- let response = selfc.handle_request(&buffer).await;
- let mut payload = serde_json::to_vec(&response)?;
- payload.push(b'\n');
- codec.write_all(&payload).await?;
+ let msg = conn.recv().await?;
+ let response = selfc.handle_request(msg).await;
+ let response = serde_json::to_value(response)?;
debug!("--> {response}");
+ conn.send(response).await?;
}
},
on_failure,
@@ -122,14 +102,13 @@ impl<'a> Server<'a> {
}
/// Handles a request
- async fn handle_request(&self, buffer: &[u8]) -> message::Response {
- let rpc_msg = match serde_json::from_slice::<message::Request>(buffer) {
+ async fn handle_request(&self, msg: serde_json::Value) -> message::Response {
+ let rpc_msg = match serde_json::from_value::<message::Request>(msg) {
Ok(m) => m,
Err(_) => {
return pack_err_res(message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG, None);
}
};
-
debug!("<-- {rpc_msg}");
let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect();
@@ -144,9 +123,7 @@ impl<'a> Server<'a> {
let srvc_name = srvc_method[0];
let method_name = srvc_method[1];
- let services = self.services.read().await;
-
- let service = match services.get(srvc_name) {
+ let service = match self.services.get(srvc_name) {
Some(s) => s,
None => {
return pack_err_res(
@@ -196,10 +173,108 @@ impl<'a> Server<'a> {
};
message::Response {
- jsonrpc: JSONRPC_VERSION.to_string(),
+ jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
result: Some(result),
id: Some(rpc_msg.id),
}
}
}
+
+pub struct ServerBuilder {
+ endpoint: Endpoint,
+ tls_config: Option<rustls::ServerConfig>,
+ services: HashMap<String, Box<dyn RPCService + 'static>>,
+}
+
+impl ServerBuilder {
+ pub fn service(mut self, service: impl RPCService + 'static) -> Self {
+ self.services.insert(service.name(), Box::new(service));
+ self
+ }
+
+ pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> {
+ match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
+ self.tls_config = Some(config);
+ Ok(self)
+ }
+ _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ }
+ }
+
+ pub async fn build(self) -> Result<Arc<Server>> {
+ self._build(TaskGroup::new()).await
+ }
+
+ pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> {
+ self._build(TaskGroup::with_executor(ex)).await
+ }
+
+ async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> {
+ let listener: Listener<serde_json::Value> = match self.endpoint {
+ Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::tls::listen(
+ &self.endpoint,
+ karyon_net::tls::ServerTlsConfig {
+ server_config: conf.clone(),
+ tcp_config: Default::default(),
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::tcp::listen(&self.endpoint, Default::default(), JsonCodec {})
+ .await?,
+ ),
+ },
+ Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::ws::listen(
+ &self.endpoint,
+ karyon_net::ws::ServerWsConfig {
+ tcp_config: Default::default(),
+ wss_config: Some(karyon_net::ws::ServerWssConfig {
+ server_config: conf.clone(),
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => Box::new(
+ karyon_net::ws::listen(&self.endpoint, Default::default(), WsJsonCodec {})
+ .await?,
+ ),
+ },
+ Endpoint::Unix(..) => Box::new(karyon_net::unix::listen(
+ &self.endpoint,
+ Default::default(),
+ JsonCodec {},
+ )?),
+
+ _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ };
+
+ Ok(Arc::new(Server {
+ listener,
+ task_group,
+ services: self.services,
+ }))
+ }
+}
+
+impl ServerBuilder {}
+
+impl Server {
+ pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> {
+ let endpoint = endpoint.to_endpoint()?;
+ Ok(ServerBuilder {
+ endpoint,
+ services: HashMap::new(),
+ tls_config: None,
+ })
+ }
+}
diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs
new file mode 100644
index 0000000..e590ae1
--- /dev/null
+++ b/jsonrpc/tests/impl_rpc_service.rs
@@ -0,0 +1,27 @@
+use karyon_jsonrpc::{impl_rpc_service, Error, RPCService};
+use serde_json::Value;
+
+#[test]
+fn service() {
+ struct Foo {}
+
+ impl Foo {
+ async fn foo(&self, params: Value) -> Result<Value, Error> {
+ Ok(params)
+ }
+ }
+
+ impl_rpc_service!(Foo, foo);
+
+ let f = Foo {};
+
+ assert!(f.get_method("foo").is_some());
+ assert!(f.get_method("bar").is_none());
+
+ let params = serde_json::json!("params");
+
+ smol::block_on(async {
+ let foo_method = f.get_method("foo").unwrap();
+ assert_eq!(foo_method(params.clone()).await.unwrap(), params);
+ });
+}
diff --git a/jsonrpc/tests/rpc_impl.rs b/jsonrpc/tests/rpc_impl.rs
new file mode 100644
index 0000000..5b14b59
--- /dev/null
+++ b/jsonrpc/tests/rpc_impl.rs
@@ -0,0 +1,26 @@
+use karyon_jsonrpc::{rpc_impl, Error, RPCService};
+use serde_json::Value;
+
+#[test]
+fn rpc_impl_service() {
+ struct Foo {}
+
+ #[rpc_impl]
+ impl Foo {
+ async fn foo(&self, params: Value) -> Result<Value, Error> {
+ Ok(params)
+ }
+ }
+
+ let f = Foo {};
+
+ assert!(f.get_method("foo").is_some());
+ assert!(f.get_method("bar").is_none());
+
+ let params = serde_json::json!("params");
+
+ smol::block_on(async {
+ let foo_method = f.get_method("foo").unwrap();
+ assert_eq!(foo_method(params.clone()).await.unwrap(), params);
+ });
+}