1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
use log::debug;
use serde::{de::DeserializeOwned, Serialize};
use karyon_core::util::random_32;
use karyon_net::ToConn;
use crate::{
codec::{Codec, CodecConfig},
message, Error, Result, JSONRPC_VERSION,
};
/// Represents client config
#[derive(Default)]
pub struct ClientConfig {
pub timeout: Option<u64>,
}
/// Represents an RPC client
pub struct Client {
codec: Codec,
config: ClientConfig,
}
impl Client {
/// Creates a new RPC client by passing a Tcp, Unix, or Tls connection.
pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self {
let codec_config = CodecConfig {
max_allowed_buffer_size: 0,
..Default::default()
};
let codec = Codec::new(conn.to_conn(), codec_config);
Self { codec, config }
}
/// Calls the provided method, waits for the response, and returns the result.
pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
&self,
method: &str,
params: T,
) -> Result<V> {
let id = serde_json::json!(random_32());
let request = message::Request {
jsonrpc: JSONRPC_VERSION.to_string(),
id,
method: method.to_string(),
params: serde_json::json!(params),
};
let mut payload = serde_json::to_vec(&request)?;
payload.push(b'\n');
self.codec.write_all(&payload).await?;
debug!("--> {request}");
let mut buffer = vec![];
if let Some(t) = self.config.timeout {
self.codec.read_until_with_timeout(&mut buffer, t).await?;
} else {
self.codec.read_until(&mut buffer).await?;
};
let response = serde_json::from_slice::<message::Response>(&buffer)?;
debug!("<-- {response}");
if let Some(error) = response.error {
return Err(Error::CallError(error.code, error.message));
}
if response.id.is_none() || response.id.unwrap() != request.id {
return Err(Error::InvalidMsg("Invalid response id"));
}
match response.result {
Some(result) => Ok(serde_json::from_value::<V>(result)?),
None => Err(Error::InvalidMsg("Invalid response result")),
}
}
}
|