aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client/builder.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/client/builder.rs')
-rw-r--r--jsonrpc/src/client/builder.rs131
1 files changed, 27 insertions, 104 deletions
diff --git a/jsonrpc/src/client/builder.rs b/jsonrpc/src/client/builder.rs
index 5a7936c..d1e3b67 100644
--- a/jsonrpc/src/client/builder.rs
+++ b/jsonrpc/src/client/builder.rs
@@ -1,26 +1,17 @@
-use std::sync::{atomic::AtomicBool, Arc};
+use std::sync::Arc;
-use karyon_core::async_util::TaskGroup;
-use karyon_net::{Conn, Endpoint, ToEndpoint};
+#[cfg(feature = "tcp")]
+use karyon_net::Endpoint;
+use karyon_net::ToEndpoint;
#[cfg(feature = "tls")]
-use karyon_net::{async_rustls::rustls, tls::ClientTlsConfig};
-
-#[cfg(feature = "ws")]
-use karyon_net::ws::ClientWsConfig;
-
-#[cfg(all(feature = "ws", feature = "tls"))]
-use karyon_net::ws::ClientWssConfig;
-
-#[cfg(feature = "ws")]
-use crate::codec::WsJsonCodec;
+use karyon_net::async_rustls::rustls;
+use crate::Result;
#[cfg(feature = "tcp")]
-use crate::TcpConfig;
+use crate::{Error, TcpConfig};
-use crate::{codec::JsonCodec, Error, Result};
-
-use super::{Client, MessageDispatcher, Subscriptions};
+use super::{Client, ClientConfig};
const DEFAULT_TIMEOUT: u64 = 3000; // 3s
@@ -44,26 +35,22 @@ impl Client {
pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
let endpoint = endpoint.to_endpoint()?;
Ok(ClientBuilder {
- endpoint,
- timeout: Some(DEFAULT_TIMEOUT),
- #[cfg(feature = "tcp")]
- tcp_config: Default::default(),
- #[cfg(feature = "tls")]
- tls_config: None,
- subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,
+ inner: ClientConfig {
+ endpoint,
+ timeout: Some(DEFAULT_TIMEOUT),
+ #[cfg(feature = "tcp")]
+ tcp_config: Default::default(),
+ #[cfg(feature = "tls")]
+ tls_config: None,
+ subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,
+ },
})
}
}
/// Builder for constructing an RPC [`Client`].
pub struct ClientBuilder {
- endpoint: Endpoint,
- #[cfg(feature = "tcp")]
- tcp_config: TcpConfig,
- #[cfg(feature = "tls")]
- tls_config: Option<(rustls::ClientConfig, String)>,
- timeout: Option<u64>,
- subscription_buffer_size: usize,
+ inner: ClientConfig,
}
impl ClientBuilder {
@@ -82,7 +69,7 @@ impl ClientBuilder {
/// };
/// ```
pub fn set_timeout(mut self, timeout: u64) -> Self {
- self.timeout = Some(timeout);
+ self.inner.timeout = Some(timeout);
self
}
@@ -106,7 +93,7 @@ impl ClientBuilder {
/// };
/// ```
pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self {
- self.subscription_buffer_size = size;
+ self.inner.subscription_buffer_size = size;
self
}
@@ -128,12 +115,12 @@ impl ClientBuilder {
/// This function will return an error if the endpoint does not support TCP protocols.
#[cfg(feature = "tcp")]
pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
- match self.endpoint {
+ match self.inner.endpoint {
Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
- self.tcp_config = config;
+ self.inner.tcp_config = config;
Ok(self)
}
- _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
+ _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())),
}
}
@@ -157,14 +144,14 @@ impl ClientBuilder {
/// This function will return an error if the endpoint does not support TLS protocols.
#[cfg(feature = "tls")]
pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
- match self.endpoint {
+ match self.inner.endpoint {
Endpoint::Tls(..) | Endpoint::Wss(..) => {
- self.tls_config = Some((config, dns_name.to_string()));
+ self.inner.tls_config = Some((config, dns_name.to_string()));
Ok(self)
}
_ => Err(Error::UnsupportedProtocol(format!(
"Invalid tls config for endpoint: {}",
- self.endpoint
+ self.inner.endpoint
))),
}
}
@@ -189,71 +176,7 @@ impl ClientBuilder {
///
/// ```
pub async fn build(self) -> Result<Arc<Client>> {
- let conn: Conn<serde_json::Value> = match self.endpoint {
- #[cfg(feature = "tcp")]
- Endpoint::Tcp(..) => Box::new(
- karyon_net::tcp::dial(&self.endpoint, self.tcp_config, JsonCodec {}).await?,
- ),
- #[cfg(feature = "tls")]
- Endpoint::Tls(..) => match self.tls_config {
- Some((conf, dns_name)) => Box::new(
- karyon_net::tls::dial(
- &self.endpoint,
- ClientTlsConfig {
- dns_name,
- client_config: conf,
- tcp_config: self.tcp_config,
- },
- JsonCodec {},
- )
- .await?,
- ),
- None => return Err(Error::TLSConfigRequired),
- },
- #[cfg(feature = "ws")]
- Endpoint::Ws(..) => {
- let config = ClientWsConfig {
- tcp_config: self.tcp_config,
- wss_config: None,
- };
- Box::new(karyon_net::ws::dial(&self.endpoint, config, WsJsonCodec {}).await?)
- }
- #[cfg(all(feature = "ws", feature = "tls"))]
- Endpoint::Wss(..) => match self.tls_config {
- Some((conf, dns_name)) => Box::new(
- karyon_net::ws::dial(
- &self.endpoint,
- ClientWsConfig {
- tcp_config: self.tcp_config,
- wss_config: Some(ClientWssConfig {
- dns_name,
- client_config: conf,
- }),
- },
- WsJsonCodec {},
- )
- .await?,
- ),
- None => return Err(Error::TLSConfigRequired),
- },
- #[cfg(all(feature = "unix", target_family = "unix"))]
- Endpoint::Unix(..) => Box::new(
- karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?,
- ),
- _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())),
- };
-
- let send_chan = async_channel::bounded(10);
-
- let client = Arc::new(Client {
- timeout: self.timeout,
- disconnect: AtomicBool::new(false),
- send_chan,
- message_dispatcher: MessageDispatcher::new(),
- subscriptions: Subscriptions::new(self.subscription_buffer_size),
- task_group: TaskGroup::new(),
- });
- client.start_background_loop(conn);
+ let client = Client::init(self.inner).await?;
Ok(client)
}
}