aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-30 22:52:53 +0300
committerhozan23 <hozan23@proton.me>2023-11-30 22:52:53 +0300
commit7d6c0e68a19ad5e2e4e05cfc219d446be6ff2286 (patch)
tree2f7bfc34ca033b059702fc26bc18170dc118c20e
parent8c70a0d379b21541b5b2d1d37ff7fc61ca311cd4 (diff)
jsonrpc: Enhance the API and add support for TCP, Unix, and TLS protocols.
-rw-r--r--jsonrpc/README.md18
-rw-r--r--jsonrpc/examples/client.rs5
-rw-r--r--jsonrpc/examples/server.rs7
-rw-r--r--jsonrpc/src/client.rs15
-rw-r--r--jsonrpc/src/lib.rs21
-rw-r--r--jsonrpc/src/server.rs23
-rw-r--r--net/src/transports/tcp.rs12
-rw-r--r--net/src/transports/tls.rs12
-rw-r--r--net/src/transports/udp.rs6
-rw-r--r--net/src/transports/unix.rs12
10 files changed, 79 insertions, 52 deletions
diff --git a/jsonrpc/README.md b/jsonrpc/README.md
index d937071..929d645 100644
--- a/jsonrpc/README.md
+++ b/jsonrpc/README.md
@@ -1,6 +1,7 @@
# karyons jsonrpc
-A fast and lightweight async [JSON-RPC 2.0](https://www.jsonrpc.org/specification) implementation.
+A fast and lightweight async implementation of [JSON-RPC
+2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols.
## Example
@@ -8,6 +9,7 @@ A fast and lightweight async [JSON-RPC 2.0](https://www.jsonrpc.org/specificatio
use std::sync::Arc;
use serde_json::Value;
+use smol::net::{TcpStream, TcpListener};
use karyons_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig};
@@ -20,15 +22,15 @@ impl HelloWorld {
}
}
+let ex = Arc::new(smol::Executor::new());
+
//////////////////
// Server
//////////////////
-let ex = Arc::new(smol::Executor::new());
-
// Creates a new server
-let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
let config = ServerConfig::default();
-let server = Server::new_with_endpoint(&endpoint, config, ex.clone()).await.unwrap();
+let server = Server::new(listener.into(), config, ex.clone());
// Register the HelloWorld service
register_service!(HelloWorld, say_hello);
@@ -38,12 +40,12 @@ server.attach_service(HelloWorld{});
ex.run(server.start());
//////////////////
-// Client
+// Client
//////////////////
// Creates a new client
-let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
let config = ClientConfig::default();
-let client = Client::new_with_endpoint(&endpoint, config).await.unwrap();
+let client = Client::new(conn.into(), config);
let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap();
diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs
index 6b60233..8f46a8e 100644
--- a/jsonrpc/examples/client.rs
+++ b/jsonrpc/examples/client.rs
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
+use smol::net::TcpStream;
use karyons_jsonrpc::{Client, ClientConfig};
@@ -14,9 +15,9 @@ struct Pong {}
fn main() {
env_logger::init();
smol::future::block_on(async {
- let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+ let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
let config = ClientConfig::default();
- let client = Client::new_with_endpoint(&endpoint, config).await.unwrap();
+ let client = Client::new(conn.into(), config);
let params = Req { x: 10, y: 7 };
let result: u32 = client.call("Calc.add", params).await.unwrap();
diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs
index 512913a..4109e0d 100644
--- a/jsonrpc/examples/server.rs
+++ b/jsonrpc/examples/server.rs
@@ -2,6 +2,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
+use smol::net::TcpListener;
use karyons_jsonrpc::{register_service, JsonRPCError, Server, ServerConfig};
@@ -43,11 +44,9 @@ fn main() {
let ex = Arc::new(smol::Executor::new());
smol::block_on(ex.clone().run(async {
// Creates a new server
- let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+ let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
let config = ServerConfig::default();
- let server = Server::new_with_endpoint(&endpoint, config, ex)
- .await
- .unwrap();
+ let server = Server::new(listener.into(), config, ex);
// Register the Calc service
register_service!(Calc, ping, add, sub, version);
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
index 939d177..0061002 100644
--- a/jsonrpc/src/client.rs
+++ b/jsonrpc/src/client.rs
@@ -2,7 +2,7 @@ use log::debug;
use serde::{de::DeserializeOwned, Serialize};
use karyons_core::util::random_32;
-use karyons_net::{dial, Conn, Endpoint};
+use karyons_net::Conn;
use crate::{
codec::{Codec, CodecConfig},
@@ -22,7 +22,7 @@ pub struct Client {
}
impl Client {
- /// Creates a new RPC client.
+ /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection.
pub fn new(conn: Conn, config: ClientConfig) -> Self {
let codec_config = CodecConfig {
max_allowed_buffer_size: 0,
@@ -32,17 +32,6 @@ impl Client {
Self { codec, config }
}
- /// Creates a new RPC client using the provided endpoint.
- pub async fn new_with_endpoint(endpoint: &Endpoint, config: ClientConfig) -> Result<Self> {
- let conn = dial(endpoint).await?;
- let codec_config = CodecConfig {
- max_allowed_buffer_size: 0,
- ..Default::default()
- };
- let codec = Codec::new(conn, codec_config);
- Ok(Self { codec, config })
- }
-
/// Calls the named method, waits for the response, and returns the result.
pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
&self,
diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs
index f73b5e6..65fb38f 100644
--- a/jsonrpc/src/lib.rs
+++ b/jsonrpc/src/lib.rs
@@ -1,4 +1,5 @@
-//! A fast and lightweight async [JSONRPC 2.0](https://www.jsonrpc.org/specification) implementation.
+//! A fast and lightweight async implementation of [JSON-RPC
+//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols.
//!
//! # Example
//!
@@ -6,6 +7,7 @@
//! use std::sync::Arc;
//!
//! use serde_json::Value;
+//! use smol::net::{TcpStream, TcpListener};
//!
//! use karyons_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig};
//!
@@ -23,9 +25,9 @@
//! let ex = Arc::new(smol::Executor::new());
//!
//! // Creates a new server
-//! let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap();
//! let config = ServerConfig::default();
-//! let server = Server::new_with_endpoint(&endpoint, config, ex.clone()).await.unwrap();
+//! let server = Server::new(listener.into(), config, ex.clone());
//!
//! // Register the HelloWorld service
//! register_service!(HelloWorld, say_hello);
@@ -39,9 +41,9 @@
//! async {
//!
//! // Creates a new client
-//! let endpoint = "tcp://127.0.0.1:60000".parse().unwrap();
+//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap();
//! let config = ClientConfig::default();
-//! let client = Client::new_with_endpoint(&endpoint, config).await.unwrap();
+//! let client = Client::new(conn.into(), config);
//!
//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap();
//! };
@@ -55,12 +57,13 @@ pub mod message;
mod server;
mod service;
-pub const JSONRPC_VERSION: &str = "2.0";
-
-use error::{Error, Result};
-
pub use client::{Client, ClientConfig};
pub use codec::CodecConfig;
pub use error::Error as JsonRPCError;
pub use server::{Server, ServerConfig};
pub use service::{RPCMethod, RPCService};
+
+pub use karyons_net::Endpoint;
+
+const JSONRPC_VERSION: &str = "2.0";
+use error::{Error, Result};
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs
index 05ef7da..0038e89 100644
--- a/jsonrpc/src/server.rs
+++ b/jsonrpc/src/server.rs
@@ -7,13 +7,14 @@ use karyons_core::{
async_util::{TaskGroup, TaskResult},
Executor,
};
-use karyons_net::{listen, Conn, Endpoint, Listener};
+
+use karyons_net::{Conn, Listener};
use crate::{
codec::{Codec, CodecConfig},
message,
service::RPCService,
- Error, Result, JSONRPC_VERSION,
+ Endpoint, Error, Result, JSONRPC_VERSION,
};
/// RPC server config
@@ -31,7 +32,7 @@ pub struct Server<'a> {
}
impl<'a> Server<'a> {
- /// Creates a new RPC server.
+ /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls.
pub fn new(listener: Box<dyn Listener>, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> {
Arc::new(Self {
listener,
@@ -41,19 +42,9 @@ impl<'a> Server<'a> {
})
}
- /// Creates a new RPC server using the provided endpoint.
- pub async fn new_with_endpoint(
- endpoint: &Endpoint,
- config: ServerConfig,
- ex: Executor<'a>,
- ) -> Result<Arc<Self>> {
- let listener = listen(endpoint).await?;
- Ok(Arc::new(Self {
- listener,
- services: RwLock::new(HashMap::new()),
- task_group: TaskGroup::new(ex),
- config,
- }))
+ /// Returns the local endpoint.
+ pub fn local_endpoint(&self) -> Result<Endpoint> {
+ self.listener.local_endpoint().map_err(Error::KaryonsNet)
}
/// Starts the RPC server
diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs
index 37f00a7..37ad860 100644
--- a/net/src/transports/tcp.rs
+++ b/net/src/transports/tcp.rs
@@ -83,3 +83,15 @@ pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result<TcpListener> {
let listener = TcpListener::bind(address).await?;
Ok(listener)
}
+
+impl From<TcpStream> for Box<dyn Connection> {
+ fn from(conn: TcpStream) -> Self {
+ Box::new(TcpConn::new(conn))
+ }
+}
+
+impl From<TcpListener> for Box<dyn Listener> {
+ fn from(listener: TcpListener) -> Self {
+ Box::new(listener)
+ }
+}
diff --git a/net/src/transports/tls.rs b/net/src/transports/tls.rs
index 01bb5aa..cbb3d99 100644
--- a/net/src/transports/tls.rs
+++ b/net/src/transports/tls.rs
@@ -138,3 +138,15 @@ pub async fn listen(
.await
.map(|l| Box::new(l) as Box<dyn Listener>)
}
+
+impl From<TlsStream<TcpStream>> for Box<dyn Connection> {
+ fn from(conn: TlsStream<TcpStream>) -> Self {
+ Box::new(TlsConn::new(conn.get_ref().0.clone(), conn))
+ }
+}
+
+impl From<TlsListener> for Box<dyn Listener> {
+ fn from(listener: TlsListener) -> Self {
+ Box::new(listener)
+ }
+}
diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs
index 8a2fbec..9576876 100644
--- a/net/src/transports/udp.rs
+++ b/net/src/transports/udp.rs
@@ -73,3 +73,9 @@ pub async fn listen_udp(addr: &Addr, port: &Port) -> Result<UdpConn> {
let udp_conn = UdpConn::new(conn);
Ok(udp_conn)
}
+
+impl From<UdpSocket> for Box<dyn Connection> {
+ fn from(conn: UdpSocket) -> Self {
+ Box::new(UdpConn::new(conn))
+ }
+}
diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs
index e504934..0698975 100644
--- a/net/src/transports/unix.rs
+++ b/net/src/transports/unix.rs
@@ -74,3 +74,15 @@ pub fn listen_unix(path: &String) -> Result<UnixListener> {
let listener = UnixListener::bind(path)?;
Ok(listener)
}
+
+impl From<UnixStream> for Box<dyn Connection> {
+ fn from(conn: UnixStream) -> Self {
+ Box::new(UnixConn::new(conn))
+ }
+}
+
+impl From<UnixListener> for Box<dyn Listener> {
+ fn from(listener: UnixListener) -> Self {
+ Box::new(listener)
+ }
+}