diff options
Diffstat (limited to 'jsonrpc/src')
-rw-r--r-- | jsonrpc/src/client.rs | 155 | ||||
-rw-r--r-- | jsonrpc/src/codec.rs | 139 | ||||
-rw-r--r-- | jsonrpc/src/error.rs | 34 | ||||
-rw-r--r-- | jsonrpc/src/lib.rs | 65 | ||||
-rw-r--r-- | jsonrpc/src/message.rs | 2 | ||||
-rw-r--r-- | jsonrpc/src/server.rs | 193 | ||||
-rw-r--r-- | jsonrpc/src/service.rs | 64 |
7 files changed, 343 insertions, 309 deletions
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index efbaf50..50d772b 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -1,37 +1,32 @@ +use std::time::Duration; + use log::debug; use serde::{de::DeserializeOwned, Serialize}; -use karyon_core::util::random_32; -use karyon_net::ToConn; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use crate::{ - codec::{Codec, CodecConfig}, - message, Error, Result, JSONRPC_VERSION, +use karyon_core::{async_util::timeout, util::random_32}; +use karyon_net::{ + tls::ClientTlsConfig, + ws::{ClientWsConfig, ClientWssConfig}, + Conn, Endpoint, ToEndpoint, }; -/// Represents client config -#[derive(Default)] -pub struct ClientConfig { - pub timeout: Option<u64>, -} +use crate::{ + codec::{JsonCodec, WsJsonCodec}, + message, Error, Result, +}; /// Represents an RPC client pub struct Client { - codec: Codec, - config: ClientConfig, + conn: Conn<serde_json::Value>, + timeout: Option<u64>, } impl Client { - /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection. - pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self { - let codec_config = CodecConfig { - max_allowed_buffer_size: 0, - ..Default::default() - }; - let codec = Codec::new(conn.to_conn(), codec_config); - Self { codec, config } - } - /// Calls the provided method, waits for the response, and returns the result. pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>( &self, @@ -41,38 +36,122 @@ impl Client { let id = serde_json::json!(random_32()); let request = message::Request { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), id, method: method.to_string(), params: serde_json::json!(params), }; - let mut payload = serde_json::to_vec(&request)?; - payload.push(b'\n'); - self.codec.write_all(&payload).await?; + let req_json = serde_json::to_value(&request)?; + match self.timeout { + Some(s) => { + let dur = Duration::from_secs(s); + timeout(dur, self.conn.send(req_json)).await??; + } + None => { + self.conn.send(req_json).await?; + } + } debug!("--> {request}"); - let mut buffer = vec![]; - if let Some(t) = self.config.timeout { - self.codec.read_until_with_timeout(&mut buffer, t).await?; - } else { - self.codec.read_until(&mut buffer).await?; - }; - - let response = serde_json::from_slice::<message::Response>(&buffer)?; + let msg = self.conn.recv().await?; + let response = serde_json::from_value::<message::Response>(msg)?; debug!("<-- {response}"); - if let Some(error) = response.error { - return Err(Error::CallError(error.code, error.message)); - } - if response.id.is_none() || response.id.unwrap() != request.id { return Err(Error::InvalidMsg("Invalid response id")); } + if let Some(error) = response.error { + return Err(Error::CallError(error.code, error.message)); + } + match response.result { Some(result) => Ok(serde_json::from_value::<V>(result)?), None => Err(Error::InvalidMsg("Invalid response result")), } } } + +pub struct ClientBuilder { + endpoint: Endpoint, + tls_config: Option<(rustls::ClientConfig, String)>, + timeout: Option<u64>, +} + +impl ClientBuilder { + pub fn with_timeout(mut self, timeout: u64) -> Self { + self.timeout = Some(timeout); + self + } + + pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some((config, dns_name.to_string())); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result<Client> { + let conn: Conn<serde_json::Value> = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.endpoint, + ClientTlsConfig { + dns_name, + client_config: conf, + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &self.endpoint, + ClientWsConfig { + tcp_config: Default::default(), + wss_config: Some(ClientWssConfig { + dns_name, + client_config: conf, + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new( + karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + Ok(Client { + timeout: self.timeout, + conn, + }) + } +} +impl Client { + pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> { + let endpoint = endpoint.to_endpoint()?; + Ok(ClientBuilder { + endpoint, + timeout: None, + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index 4a70412..74415c7 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,100 +1,73 @@ -use memchr::memchr; +use async_tungstenite::tungstenite::Message; -use karyon_core::async_util::timeout; -use karyon_net::Conn; +use karyon_net::{ + codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder}, + Error, Result, +}; -use crate::{Error, Result}; - -const DEFAULT_BUFFER_SIZE: usize = 1024; -const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB - -// TODO: Add unit tests for Codec's functions. - -/// Represents Codec config #[derive(Clone)] -pub struct CodecConfig { - pub default_buffer_size: usize, - /// The maximum allowed buffer size to receive a message. If set to zero, - /// there will be no size limit. - pub max_allowed_buffer_size: usize, -} - -impl Default for CodecConfig { - fn default() -> Self { - Self { - default_buffer_size: DEFAULT_BUFFER_SIZE, - max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE, - } - } -} +pub struct JsonCodec {} -pub struct Codec { - conn: Conn, - config: CodecConfig, +impl Codec for JsonCodec { + type Item = serde_json::Value; } -impl Codec { - /// Creates a new Codec - pub fn new(conn: Conn, config: CodecConfig) -> Self { - Self { conn, config } +impl Encoder for JsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + let buf = msg.as_bytes(); + dst[..buf.len()].copy_from_slice(buf); + Ok(buf.len()) } +} - /// Read all bytes into `buffer` until the `0x0A` byte or EOF is - /// reached. - /// - /// If successful, this function will return the total number of bytes read. - pub async fn read_until(&self, buffer: &mut Vec<u8>) -> Result<usize> { - let delim = b'\n'; - - let mut read = 0; - - loop { - let mut tmp_buf = vec![0; self.config.default_buffer_size]; - let n = self.conn.read(&mut tmp_buf).await?; - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - - match memchr(delim, &tmp_buf) { - Some(i) => { - buffer.extend_from_slice(&tmp_buf[..=i]); - read += i + 1; - break; - } - None => { - buffer.extend_from_slice(&tmp_buf); - read += tmp_buf.len(); - } - } +impl Decoder for JsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + let de = serde_json::Deserializer::from_slice(src); + let mut iter = de.into_iter::<serde_json::Value>(); - if self.config.max_allowed_buffer_size != 0 - && buffer.len() == self.config.max_allowed_buffer_size - { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size", - )); - } - } + let item = match iter.next() { + Some(Ok(item)) => item, + Some(Err(ref e)) if e.is_eof() => return Ok(None), + Some(Err(e)) => return Err(Error::Encode(e.to_string())), + None => return Ok(None), + }; - Ok(read) + Ok(Some((iter.byte_offset(), item))) } +} - /// Writes an entire buffer into the given connection. - pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.write(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at(n); - buf = rest; - - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - } +#[derive(Clone)] +pub struct WsJsonCodec {} +impl WebSocketCodec for WsJsonCodec { + type Item = serde_json::Value; +} - Ok(()) +impl WebSocketEncoder for WsJsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem) -> Result<Message> { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + Ok(Message::Text(msg)) } +} - pub async fn read_until_with_timeout(&self, buffer: &mut Vec<u8>, t: u64) -> Result<usize> { - timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? +impl WebSocketDecoder for WsJsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &Message) -> Result<Self::DeItem> { + match src { + Message::Text(s) => match serde_json::from_str(s) { + Ok(m) => Ok(m), + Err(err) => Err(Error::Decode(err.to_string())), + }, + _ => Err(Error::Decode("Receive wrong message".to_string())), + } } } diff --git a/jsonrpc/src/error.rs b/jsonrpc/src/error.rs deleted file mode 100644 index 8bc8c49..0000000 --- a/jsonrpc/src/error.rs +++ /dev/null @@ -1,34 +0,0 @@ -use thiserror::Error as ThisError; - -pub type Result<T> = std::result::Result<T, Error>; - -/// Represents karyon's jsonrpc Error. -#[derive(ThisError, Debug)] -pub enum Error { - #[error(transparent)] - IO(#[from] std::io::Error), - - #[error("Call Error: code: {0} msg: {1}")] - CallError(i32, String), - - #[error("RPC Method Error: code: {0} msg: {1}")] - RPCMethodError(i32, &'static str), - - #[error("Invalid Params: {0}")] - InvalidParams(&'static str), - - #[error("Invalid Request: {0}")] - InvalidRequest(&'static str), - - #[error(transparent)] - ParseJSON(#[from] serde_json::Error), - - #[error("Invalid Message Error: {0}")] - InvalidMsg(&'static str), - - #[error(transparent)] - KaryonCore(#[from] karyon_core::error::Error), - - #[error(transparent)] - KaryonNet(#[from] karyon_net::NetError), -} diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 3e0eb8f..1410a62 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,5 +1,12 @@ //! A fast and lightweight async implementation of [JSON-RPC -//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. +//! 2.0](https://www.jsonrpc.org/specification). +//! +//! features: +//! - Supports TCP, TLS, WebSocket, and Unix protocols. +//! - Uses smol(async-std) as the async runtime, but also supports tokio via +//! the `tokio` feature. +//! - Allows registration of multiple services (structs) of different types on a +//! single server. //! //! # Example //! @@ -9,69 +16,65 @@ //! use serde_json::Value; //! use smol::net::{TcpStream, TcpListener}; //! -//! use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; +//! use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; //! //! struct HelloWorld {} //! +//! #[rpc_impl] //! impl HelloWorld { -//! async fn say_hello(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn say_hello(&self, params: Value) -> Result<Value, Error> { //! let msg: String = serde_json::from_value(params)?; //! Ok(serde_json::json!(format!("Hello {msg}!"))) //! } //! -//! async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn foo(&self, params: Value) -> Result<Value, Error> { //! Ok(serde_json::json!("foo!")) //! } //! -//! async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn bar(&self, params: Value) -> Result<Value, Error> { //! Ok(serde_json::json!("bar!")) //! } //! } //! //! // Server //! async { -//! let ex = Arc::new(smol::Executor::new()); -//! //! // Creates a new server -//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); -//! let config = ServerConfig::default(); -//! let server = Server::new(listener, config, ex.clone()); -//! -//! // Register the HelloWorld service -//! register_service!(HelloWorld, say_hello, foo, bar); -//! server.attach_service(HelloWorld{}); +//! let server = Server::builder("tcp://127.0.0.1:60000") +//! .expect("create new server builder") +//! .service(HelloWorld{}) +//! .build() +//! .await +//! .expect("build the server"); //! //! // Starts the server -//! ex.run(server.start()); +//! server.start().await.expect("start the server"); //! }; //! //! // Client //! async { -//! //! // Creates a new client -//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); -//! let config = ClientConfig::default(); -//! let client = Client::new(conn, config); -//! -//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); +//! let client = Client::builder("tcp://127.0.0.1:60000") +//! .expect("create new client builder") +//! .build() +//! .await +//! .expect("build the client"); +//! +//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()) +//! .await +//! .expect("send a request"); //! }; //! //! ``` mod client; mod codec; -mod error; pub mod message; mod server; -mod service; -pub use client::{Client, ClientConfig}; -pub use codec::CodecConfig; -pub use error::Error as JsonRPCError; -pub use server::{Server, ServerConfig}; -pub use service::{RPCMethod, RPCService}; +pub use client::Client; +pub use server::Server; +pub use karyon_jsonrpc_internal::{impl_rpc_service, RPCMethod, RPCService}; +pub use karyon_jsonrpc_internal::{Error, Result}; +pub use karyon_jsonrpc_macro::rpc_impl; pub use karyon_net::Endpoint; - -const JSONRPC_VERSION: &str = "2.0"; -use error::{Error, Result}; diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 89ef613..f4bf490 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +pub const JSONRPC_VERSION: &str = "2.0"; + /// Parse error: Invalid JSON was received by the server. pub const PARSE_ERROR_CODE: i32 = -32700; diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 26d632a..1cc7e1f 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -1,17 +1,20 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, warn}; -use smol::lock::RwLock; -use karyon_core::async_util::{Executor, TaskGroup, TaskResult}; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use karyon_net::{Conn, Listener, ToListener}; +use karyon_core::async_runtime::Executor; +use karyon_core::async_util::{TaskGroup, TaskResult}; + +use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; use crate::{ - codec::{Codec, CodecConfig}, - message, - service::RPCService, - Endpoint, Error, Result, JSONRPC_VERSION, + codec::{JsonCodec, WsJsonCodec}, + message, Error, RPCService, Result, }; pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; @@ -27,69 +30,50 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message: }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: Some(err), result: None, id, } } -/// RPC server config -#[derive(Default)] -pub struct ServerConfig { - codec_config: CodecConfig, -} - /// Represents an RPC server -pub struct Server<'a> { - listener: Listener, - services: RwLock<HashMap<String, Box<dyn RPCService + 'a>>>, - task_group: TaskGroup<'a>, - config: ServerConfig, +pub struct Server { + listener: Listener<serde_json::Value>, + task_group: TaskGroup, + services: HashMap<String, Box<dyn RPCService + 'static>>, } -impl<'a> Server<'a> { - /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls. - pub fn new<T: ToListener>(listener: T, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> { - Arc::new(Self { - listener: listener.to_listener(), - services: RwLock::new(HashMap::new()), - task_group: TaskGroup::with_executor(ex), - config, - }) - } - +impl Server { /// Returns the local endpoint. pub fn local_endpoint(&self) -> Result<Endpoint> { - self.listener.local_endpoint().map_err(Error::KaryonNet) + self.listener.local_endpoint().map_err(Error::from) } /// Starts the RPC server pub async fn start(self: Arc<Self>) -> Result<()> { loop { - let conn = self.listener.accept().await?; - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") + match self.listener.accept().await { + Ok(conn) => { + if let Err(err) = self.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } } } } - /// Attach a new service to the RPC server - pub async fn attach_service(&self, service: impl RPCService + 'a) { - self.services - .write() - .await - .insert(service.name(), Box::new(service)); - } - /// Shuts down the RPC server pub async fn shutdown(&self) { self.task_group.cancel().await; } /// Handles a new connection - async fn handle_conn(self: &Arc<Self>, conn: Conn) -> Result<()> { - let endpoint = conn.peer_endpoint()?; + async fn handle_conn(self: &Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> { + let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); let on_failure = |result: TaskResult<Result<()>>| async move { @@ -100,19 +84,15 @@ impl<'a> Server<'a> { } }; - let codec = Codec::new(conn, self.config.codec_config.clone()); - let selfc = self.clone(); self.task_group.spawn( async move { loop { - let mut buffer = vec![]; - codec.read_until(&mut buffer).await?; - let response = selfc.handle_request(&buffer).await; - let mut payload = serde_json::to_vec(&response)?; - payload.push(b'\n'); - codec.write_all(&payload).await?; + let msg = conn.recv().await?; + let response = selfc.handle_request(msg).await; + let response = serde_json::to_value(response)?; debug!("--> {response}"); + conn.send(response).await?; } }, on_failure, @@ -122,14 +102,13 @@ impl<'a> Server<'a> { } /// Handles a request - async fn handle_request(&self, buffer: &[u8]) -> message::Response { - let rpc_msg = match serde_json::from_slice::<message::Request>(buffer) { + async fn handle_request(&self, msg: serde_json::Value) -> message::Response { + let rpc_msg = match serde_json::from_value::<message::Request>(msg) { Ok(m) => m, Err(_) => { return pack_err_res(message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG, None); } }; - debug!("<-- {rpc_msg}"); let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect(); @@ -144,9 +123,7 @@ impl<'a> Server<'a> { let srvc_name = srvc_method[0]; let method_name = srvc_method[1]; - let services = self.services.read().await; - - let service = match services.get(srvc_name) { + let service = match self.services.get(srvc_name) { Some(s) => s, None => { return pack_err_res( @@ -196,10 +173,108 @@ impl<'a> Server<'a> { }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, result: Some(result), id: Some(rpc_msg.id), } } } + +pub struct ServerBuilder { + endpoint: Endpoint, + tls_config: Option<rustls::ServerConfig>, + services: HashMap<String, Box<dyn RPCService + 'static>>, +} + +impl ServerBuilder { + pub fn service(mut self, service: impl RPCService + 'static) -> Self { + self.services.insert(service.name(), Box::new(service)); + self + } + + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result<Arc<Server>> { + self._build(TaskGroup::new()).await + } + + pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> { + let listener: Listener<serde_json::Value> = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, Default::default(), JsonCodec {}) + .await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + karyon_net::ws::ServerWsConfig { + tcp_config: Default::default(), + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::listen(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + })) + } +} + +impl ServerBuilder {} + +impl Server { + pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/service.rs b/jsonrpc/src/service.rs deleted file mode 100644 index 23a50d9..0000000 --- a/jsonrpc/src/service.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use crate::Result; - -/// Represents the RPC method -pub type RPCMethod<'a> = Box<dyn Fn(serde_json::Value) -> RPCMethodOutput<'a> + Send + 'a>; -type RPCMethodOutput<'a> = - Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + Sync + 'a>>; - -/// Defines the interface for an RPC service. -pub trait RPCService: Sync + Send { - fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>; - fn name(&self) -> String; -} - -/// Implements the [`RPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{JsonRPCError, register_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// register_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! register_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { - fn get_method<'a>( - &'a self, - name: &'a str - ) -> Option<karyon_jsonrpc::RPCMethod> { - match name { - $( - stringify!($m) => { - Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} |