From 7d6c0e68a19ad5e2e4e05cfc219d446be6ff2286 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 30 Nov 2023 22:52:53 +0300 Subject: jsonrpc: Enhance the API and add support for TCP, Unix, and TLS protocols. --- jsonrpc/README.md | 18 ++++++++++-------- jsonrpc/examples/client.rs | 5 +++-- jsonrpc/examples/server.rs | 7 +++---- jsonrpc/src/client.rs | 15 ++------------- jsonrpc/src/lib.rs | 21 ++++++++++++--------- jsonrpc/src/server.rs | 23 +++++++---------------- net/src/transports/tcp.rs | 12 ++++++++++++ net/src/transports/tls.rs | 12 ++++++++++++ net/src/transports/udp.rs | 6 ++++++ net/src/transports/unix.rs | 12 ++++++++++++ 10 files changed, 79 insertions(+), 52 deletions(-) diff --git a/jsonrpc/README.md b/jsonrpc/README.md index d937071..929d645 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -1,6 +1,7 @@ # karyons jsonrpc -A fast and lightweight async [JSON-RPC 2.0](https://www.jsonrpc.org/specification) implementation. +A fast and lightweight async implementation of [JSON-RPC +2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. ## Example @@ -8,6 +9,7 @@ A fast and lightweight async [JSON-RPC 2.0](https://www.jsonrpc.org/specificatio use std::sync::Arc; use serde_json::Value; +use smol::net::{TcpStream, TcpListener}; use karyons_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; @@ -20,15 +22,15 @@ impl HelloWorld { } } +let ex = Arc::new(smol::Executor::new()); + ////////////////// // Server ////////////////// -let ex = Arc::new(smol::Executor::new()); - // Creates a new server -let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); +let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); let config = ServerConfig::default(); -let server = Server::new_with_endpoint(&endpoint, config, ex.clone()).await.unwrap(); +let server = Server::new(listener.into(), config, ex.clone()); // Register the HelloWorld service register_service!(HelloWorld, say_hello); @@ -38,12 +40,12 @@ server.attach_service(HelloWorld{}); ex.run(server.start()); ////////////////// -// Client +// Client ////////////////// // Creates a new client -let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); +let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); let config = ClientConfig::default(); -let client = Client::new_with_endpoint(&endpoint, config).await.unwrap(); +let client = Client::new(conn.into(), config); let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs index 6b60233..8f46a8e 100644 --- a/jsonrpc/examples/client.rs +++ b/jsonrpc/examples/client.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use smol::net::TcpStream; use karyons_jsonrpc::{Client, ClientConfig}; @@ -14,9 +15,9 @@ struct Pong {} fn main() { env_logger::init(); smol::future::block_on(async { - let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); + let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); let config = ClientConfig::default(); - let client = Client::new_with_endpoint(&endpoint, config).await.unwrap(); + let client = Client::new(conn.into(), config); let params = Req { x: 10, y: 7 }; let result: u32 = client.call("Calc.add", params).await.unwrap(); diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 512913a..4109e0d 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use serde_json::Value; +use smol::net::TcpListener; use karyons_jsonrpc::{register_service, JsonRPCError, Server, ServerConfig}; @@ -43,11 +44,9 @@ fn main() { let ex = Arc::new(smol::Executor::new()); smol::block_on(ex.clone().run(async { // Creates a new server - let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); + let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); let config = ServerConfig::default(); - let server = Server::new_with_endpoint(&endpoint, config, ex) - .await - .unwrap(); + let server = Server::new(listener.into(), config, ex); // Register the Calc service register_service!(Calc, ping, add, sub, version); diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index 939d177..0061002 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -2,7 +2,7 @@ use log::debug; use serde::{de::DeserializeOwned, Serialize}; use karyons_core::util::random_32; -use karyons_net::{dial, Conn, Endpoint}; +use karyons_net::Conn; use crate::{ codec::{Codec, CodecConfig}, @@ -22,7 +22,7 @@ pub struct Client { } impl Client { - /// Creates a new RPC client. + /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection. pub fn new(conn: Conn, config: ClientConfig) -> Self { let codec_config = CodecConfig { max_allowed_buffer_size: 0, @@ -32,17 +32,6 @@ impl Client { Self { codec, config } } - /// Creates a new RPC client using the provided endpoint. - pub async fn new_with_endpoint(endpoint: &Endpoint, config: ClientConfig) -> Result { - let conn = dial(endpoint).await?; - let codec_config = CodecConfig { - max_allowed_buffer_size: 0, - ..Default::default() - }; - let codec = Codec::new(conn, codec_config); - Ok(Self { codec, config }) - } - /// Calls the named method, waits for the response, and returns the result. pub async fn call( &self, diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index f73b5e6..65fb38f 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,4 +1,5 @@ -//! A fast and lightweight async [JSONRPC 2.0](https://www.jsonrpc.org/specification) implementation. +//! A fast and lightweight async implementation of [JSON-RPC +//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. //! //! # Example //! @@ -6,6 +7,7 @@ //! use std::sync::Arc; //! //! use serde_json::Value; +//! use smol::net::{TcpStream, TcpListener}; //! //! use karyons_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; //! @@ -23,9 +25,9 @@ //! let ex = Arc::new(smol::Executor::new()); //! //! // Creates a new server -//! let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); +//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); //! let config = ServerConfig::default(); -//! let server = Server::new_with_endpoint(&endpoint, config, ex.clone()).await.unwrap(); +//! let server = Server::new(listener.into(), config, ex.clone()); //! //! // Register the HelloWorld service //! register_service!(HelloWorld, say_hello); @@ -39,9 +41,9 @@ //! async { //! //! // Creates a new client -//! let endpoint = "tcp://127.0.0.1:60000".parse().unwrap(); +//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); //! let config = ClientConfig::default(); -//! let client = Client::new_with_endpoint(&endpoint, config).await.unwrap(); +//! let client = Client::new(conn.into(), config); //! //! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); //! }; @@ -55,12 +57,13 @@ pub mod message; mod server; mod service; -pub const JSONRPC_VERSION: &str = "2.0"; - -use error::{Error, Result}; - pub use client::{Client, ClientConfig}; pub use codec::CodecConfig; pub use error::Error as JsonRPCError; pub use server::{Server, ServerConfig}; pub use service::{RPCMethod, RPCService}; + +pub use karyons_net::Endpoint; + +const JSONRPC_VERSION: &str = "2.0"; +use error::{Error, Result}; diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 05ef7da..0038e89 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -7,13 +7,14 @@ use karyons_core::{ async_util::{TaskGroup, TaskResult}, Executor, }; -use karyons_net::{listen, Conn, Endpoint, Listener}; + +use karyons_net::{Conn, Listener}; use crate::{ codec::{Codec, CodecConfig}, message, service::RPCService, - Error, Result, JSONRPC_VERSION, + Endpoint, Error, Result, JSONRPC_VERSION, }; /// RPC server config @@ -31,7 +32,7 @@ pub struct Server<'a> { } impl<'a> Server<'a> { - /// Creates a new RPC server. + /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls. pub fn new(listener: Box, config: ServerConfig, ex: Executor<'a>) -> Arc { Arc::new(Self { listener, @@ -41,19 +42,9 @@ impl<'a> Server<'a> { }) } - /// Creates a new RPC server using the provided endpoint. - pub async fn new_with_endpoint( - endpoint: &Endpoint, - config: ServerConfig, - ex: Executor<'a>, - ) -> Result> { - let listener = listen(endpoint).await?; - Ok(Arc::new(Self { - listener, - services: RwLock::new(HashMap::new()), - task_group: TaskGroup::new(ex), - config, - })) + /// Returns the local endpoint. + pub fn local_endpoint(&self) -> Result { + self.listener.local_endpoint().map_err(Error::KaryonsNet) } /// Starts the RPC server diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs index 37f00a7..37ad860 100644 --- a/net/src/transports/tcp.rs +++ b/net/src/transports/tcp.rs @@ -83,3 +83,15 @@ pub async fn listen_tcp(addr: &Addr, port: &Port) -> Result { let listener = TcpListener::bind(address).await?; Ok(listener) } + +impl From for Box { + fn from(conn: TcpStream) -> Self { + Box::new(TcpConn::new(conn)) + } +} + +impl From for Box { + fn from(listener: TcpListener) -> Self { + Box::new(listener) + } +} diff --git a/net/src/transports/tls.rs b/net/src/transports/tls.rs index 01bb5aa..cbb3d99 100644 --- a/net/src/transports/tls.rs +++ b/net/src/transports/tls.rs @@ -138,3 +138,15 @@ pub async fn listen( .await .map(|l| Box::new(l) as Box) } + +impl From> for Box { + fn from(conn: TlsStream) -> Self { + Box::new(TlsConn::new(conn.get_ref().0.clone(), conn)) + } +} + +impl From for Box { + fn from(listener: TlsListener) -> Self { + Box::new(listener) + } +} diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs index 8a2fbec..9576876 100644 --- a/net/src/transports/udp.rs +++ b/net/src/transports/udp.rs @@ -73,3 +73,9 @@ pub async fn listen_udp(addr: &Addr, port: &Port) -> Result { let udp_conn = UdpConn::new(conn); Ok(udp_conn) } + +impl From for Box { + fn from(conn: UdpSocket) -> Self { + Box::new(UdpConn::new(conn)) + } +} diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index e504934..0698975 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -74,3 +74,15 @@ pub fn listen_unix(path: &String) -> Result { let listener = UnixListener::bind(path)?; Ok(listener) } + +impl From for Box { + fn from(conn: UnixStream) -> Self { + Box::new(UnixConn::new(conn)) + } +} + +impl From for Box { + fn from(listener: UnixListener) -> Self { + Box::new(listener) + } +} -- cgit v1.2.3