aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-19 22:19:06 +0300
committerhozan23 <hozan23@proton.me>2023-11-19 22:19:06 +0300
commit4d51e3211740764764a6423f8ead4944e1790341 (patch)
tree01ba1b4d9f27913d563b6df1112d67d697627cdc /jsonrpc/src/client.rs
parent938b29d418a9df2f93ee273a394f34adc99ea25d (diff)
karyons jsonrpc implementation
Diffstat (limited to 'jsonrpc/src/client.rs')
-rw-r--r--jsonrpc/src/client.rs77
1 files changed, 77 insertions, 0 deletions
diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs
new file mode 100644
index 0000000..2863204
--- /dev/null
+++ b/jsonrpc/src/client.rs
@@ -0,0 +1,77 @@
+use log::debug;
+use serde::{de::DeserializeOwned, Serialize};
+
+use karyons_core::{async_utils::timeout, utils::random_32};
+use karyons_net::{dial, Conn, Endpoint};
+
+use crate::{
+ message,
+ utils::{read_until, write_all},
+ Error, Result, JSONRPC_VERSION,
+};
+
+/// Represents an RPC client
+pub struct Client {
+ conn: Conn,
+ timeout: Option<u64>,
+}
+
+impl Client {
+ /// Creates a new RPC client.
+ pub fn new(conn: Conn, timeout: Option<u64>) -> Self {
+ Self { conn, timeout }
+ }
+
+ /// Creates a new RPC client using the provided endpoint.
+ pub async fn new_with_endpoint(endpoint: &Endpoint, timeout: Option<u64>) -> Result<Self> {
+ let conn = dial(endpoint).await?;
+ Ok(Self { conn, timeout })
+ }
+
+ /// Calls the named method, waits for the response, and returns the result.
+ pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
+ &self,
+ method: &str,
+ params: T,
+ ) -> Result<V> {
+ let id = serde_json::json!(random_32());
+
+ let request = message::Request {
+ jsonrpc: JSONRPC_VERSION.to_string(),
+ id,
+ method: method.to_string(),
+ params: serde_json::json!(params),
+ };
+
+ let payload = serde_json::to_vec(&request)?;
+ write_all(&self.conn, &payload).await?;
+ debug!("--> {request}");
+
+ let mut buffer = vec![];
+ if let Some(t) = self.timeout {
+ timeout(
+ std::time::Duration::from_secs(t),
+ read_until(&self.conn, &mut buffer),
+ )
+ .await?
+ } else {
+ read_until(&self.conn, &mut buffer).await
+ }?;
+
+ let response = serde_json::from_slice::<message::Response>(&buffer)?;
+ debug!("<-- {response}");
+
+ if let Some(error) = response.error {
+ return Err(Error::CallError(error.code, error.message));
+ }
+
+ if response.id.is_none() || response.id.unwrap() != request.id {
+ return Err(Error::InvalidMsg("Invalid response id"));
+ }
+
+ match response.result {
+ Some(result) => Ok(serde_json::from_value::<V>(result)?),
+ None => Err(Error::InvalidMsg("Invalid response result")),
+ }
+ }
+}