aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/client.rs
blob: 8e413e26bca1a7d48b9d21e808ff2982366158e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use log::debug;
use serde::{de::DeserializeOwned, Serialize};

use karyon_core::util::random_32;
use karyon_net::ToConn;

use crate::{
    codec::{Codec, CodecConfig},
    message, Error, Result, JSONRPC_VERSION,
};

/// Represents client config
#[derive(Default)]
pub struct ClientConfig {
    pub timeout: Option<u64>,
}

/// Represents an RPC client
pub struct Client {
    codec: Codec,
    config: ClientConfig,
}

impl Client {
    /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection.
    pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self {
        let codec_config = CodecConfig {
            max_allowed_buffer_size: 0,
            ..Default::default()
        };
        let codec = Codec::new(conn.to_conn(), codec_config);
        Self { codec, config }
    }

    /// Calls the named method, waits for the response, and returns the result.
    pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
        &self,
        method: &str,
        params: T,
    ) -> Result<V> {
        let id = serde_json::json!(random_32());

        let request = message::Request {
            jsonrpc: JSONRPC_VERSION.to_string(),
            id,
            method: method.to_string(),
            params: serde_json::json!(params),
        };

        let mut payload = serde_json::to_vec(&request)?;
        payload.push(b'\n');
        self.codec.write_all(&payload).await?;
        debug!("--> {request}");

        let mut buffer = vec![];
        if let Some(t) = self.config.timeout {
            self.codec.read_until_timeout(&mut buffer, t).await?;
        } else {
            self.codec.read_until(&mut buffer).await?;
        };

        let response = serde_json::from_slice::<message::Response>(&buffer)?;
        debug!("<-- {response}");

        if let Some(error) = response.error {
            return Err(Error::CallError(error.code, error.message));
        }

        if response.id.is_none() || response.id.unwrap() != request.id {
            return Err(Error::InvalidMsg("Invalid response id"));
        }

        match response.result {
            Some(result) => Ok(serde_json::from_value::<V>(result)?),
            None => Err(Error::InvalidMsg("Invalid response result")),
        }
    }
}