aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server/mod.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-22 15:33:24 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-22 15:33:24 +0200
commit6c793e7ed3f3736e2169976f11e304f288ca6813 (patch)
tree5ea269f9b0a3148e6fc206ef166900309a0d43dc /jsonrpc/src/server/mod.rs
parent0a2c0dbc6c1afd56e9db0d93eef1ae05fe81a30b (diff)
jsonrpc: use `ServerConfig` and `ClientConfig` as the inner field in
`ServerBuilder` and `ClientBuilder`
Diffstat (limited to 'jsonrpc/src/server/mod.rs')
-rw-r--r--jsonrpc/src/server/mod.rs108
1 files changed, 102 insertions, 6 deletions
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index 00b0fd2..ddebeb9 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -8,10 +8,22 @@ use std::{collections::HashMap, sync::Arc};
use log::{debug, error, trace, warn};
-use karyon_core::async_util::{select, Either, TaskGroup, TaskResult};
+use karyon_core::{
+ async_runtime::Executor,
+ async_util::{select, Either, TaskGroup, TaskResult},
+};
+
+#[cfg(feature = "tls")]
+use karyon_net::async_rustls::rustls;
+#[cfg(feature = "tcp")]
+use karyon_net::tcp::TcpConfig;
+#[cfg(feature = "ws")]
+use karyon_net::ws::ServerWsConfig;
use karyon_net::{Conn, Endpoint, Listener};
-use crate::{message, Error, PubSubRPCService, RPCService, Result};
+#[cfg(feature = "ws")]
+use crate::codec::WsJsonCodec;
+use crate::{codec::JsonCodec, message, Error, PubSubRPCService, RPCService, Result};
use channel::Channel;
use response_queue::ResponseQueue;
@@ -32,12 +44,21 @@ enum SanityCheckResult {
ErrRes(message::Response),
}
+struct ServerConfig {
+ endpoint: Endpoint,
+ #[cfg(feature = "tcp")]
+ tcp_config: TcpConfig,
+ #[cfg(feature = "tls")]
+ tls_config: Option<rustls::ServerConfig>,
+ services: HashMap<String, Arc<dyn RPCService + 'static>>,
+ pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>,
+}
+
/// Represents an RPC server
pub struct Server {
listener: Listener<serde_json::Value>,
task_group: TaskGroup,
- services: HashMap<String, Arc<dyn RPCService + 'static>>,
- pubsub_services: HashMap<String, Arc<dyn PubSubRPCService + 'static>>,
+ config: ServerConfig,
}
impl Server {
@@ -265,7 +286,7 @@ impl Server {
};
// Check if the service exists in pubsub services list
- if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
+ if let Some(service) = self.config.pubsub_services.get(&req.srvc_name) {
// Check if the method exists within the service
if let Some(method) = service.get_pubsub_method(&req.method_name) {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
@@ -279,7 +300,7 @@ impl Server {
}
// Check if the service exists in services list
- if let Some(service) = self.services.get(&req.srvc_name) {
+ if let Some(service) = self.config.services.get(&req.srvc_name) {
// Check if the method exists within the service
if let Some(method) = service.get_method(&req.method_name) {
let params = req.msg.params.unwrap_or(serde_json::json!(()));
@@ -300,4 +321,79 @@ impl Server {
response
}
+
+ async fn init(config: ServerConfig, ex: Option<Executor>) -> Result<Arc<Self>> {
+ let task_group = match ex {
+ Some(ex) => TaskGroup::with_executor(ex),
+ None => TaskGroup::new(),
+ };
+ let listener = Self::listen(&config).await?;
+ let server = Arc::new(Server {
+ listener,
+ task_group,
+ config,
+ });
+
+ Ok(server)
+ }
+
+ async fn listen(config: &ServerConfig) -> Result<Listener<serde_json::Value>> {
+ let endpoint = config.endpoint.clone();
+ let listener: Listener<serde_json::Value> = match endpoint {
+ #[cfg(feature = "tcp")]
+ Endpoint::Tcp(..) => Box::new(
+ karyon_net::tcp::listen(&endpoint, config.tcp_config.clone(), JsonCodec {}).await?,
+ ),
+ #[cfg(feature = "tls")]
+ Endpoint::Tls(..) => match &config.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::tls::listen(
+ &endpoint,
+ karyon_net::tls::ServerTlsConfig {
+ server_config: conf.clone(),
+ tcp_config: config.tcp_config.clone(),
+ },
+ JsonCodec {},
+ )
+ .await?,
+ ),
+ None => return Err(Error::TLSConfigRequired),
+ },
+ #[cfg(feature = "ws")]
+ Endpoint::Ws(..) => {
+ let config = ServerWsConfig {
+ tcp_config: config.tcp_config.clone(),
+ wss_config: None,
+ };
+ Box::new(karyon_net::ws::listen(&endpoint, config, WsJsonCodec {}).await?)
+ }
+ #[cfg(all(feature = "ws", feature = "tls"))]
+ Endpoint::Wss(..) => match &config.tls_config {
+ Some(conf) => Box::new(
+ karyon_net::ws::listen(
+ &endpoint,
+ ServerWsConfig {
+ tcp_config: config.tcp_config.clone(),
+ wss_config: Some(karyon_net::ws::ServerWssConfig {
+ server_config: conf.clone(),
+ }),
+ },
+ WsJsonCodec {},
+ )
+ .await?,
+ ),
+ None => return Err(Error::TLSConfigRequired),
+ },
+ #[cfg(all(feature = "unix", target_family = "unix"))]
+ Endpoint::Unix(..) => Box::new(karyon_net::unix::listen(
+ &endpoint,
+ Default::default(),
+ JsonCodec {},
+ )?),
+
+ _ => return Err(Error::UnsupportedProtocol(endpoint.to_string())),
+ };
+
+ Ok(listener)
+ }
}