From 598f9e2d47da80f2bec2ead9c2fe215eff157936 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 20 Nov 2023 23:15:10 +0300 Subject: jsonrpc: add Codec struct for reading from and writing to the connection --- jsonrpc/src/server.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'jsonrpc/src/server.rs') diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 6c01a96..133f261 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -10,36 +10,49 @@ use karyons_core::{ use karyons_net::{listen, Conn, Endpoint, Listener}; use crate::{ + codec::{Codec, CodecConfig}, message, service::RPCService, - utils::{read_until, write_all}, Error, Result, JSONRPC_VERSION, }; +/// Represents an RPC server +#[derive(Default)] +pub struct ServerConfig { + codec_config: CodecConfig, +} + /// Represents an RPC server pub struct Server<'a> { listener: Box, services: RwLock>>, task_group: TaskGroup<'a>, + config: ServerConfig, } impl<'a> Server<'a> { /// Creates a new RPC server. - pub fn new(listener: Box, ex: Executor<'a>) -> Arc { + pub fn new(listener: Box, config: ServerConfig, ex: Executor<'a>) -> Arc { Arc::new(Self { listener, services: RwLock::new(HashMap::new()), task_group: TaskGroup::new(ex), + config, }) } /// Creates a new RPC server using the provided endpoint. - pub async fn new_with_endpoint(endpoint: &Endpoint, ex: Executor<'a>) -> Result> { + pub async fn new_with_endpoint( + endpoint: &Endpoint, + config: ServerConfig, + ex: Executor<'a>, + ) -> Result> { let listener = listen(endpoint).await?; Ok(Arc::new(Self { listener, services: RwLock::new(HashMap::new()), task_group: TaskGroup::new(ex), + config, })) } @@ -77,15 +90,17 @@ impl<'a> Server<'a> { } }; + let codec = Codec::new(conn, self.config.codec_config.clone()); + let selfc = self.clone(); self.task_group.spawn( async move { loop { let mut buffer = vec![]; - read_until(&conn, &mut buffer).await?; + codec.read_until(&mut buffer).await?; let response = selfc.handle_request(&buffer).await; let payload = serde_json::to_vec(&response)?; - write_all(&conn, &payload).await?; + codec.write_all(&payload).await?; debug!("--> {response}"); } }, -- cgit v1.2.3