diff options
Diffstat (limited to 'jsonrpc/src/server.rs')
-rw-r--r-- | jsonrpc/src/server.rs | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 6c01a96..133f261 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -10,36 +10,49 @@ use karyons_core::{ use karyons_net::{listen, Conn, Endpoint, Listener}; use crate::{ + codec::{Codec, CodecConfig}, message, service::RPCService, - utils::{read_until, write_all}, Error, Result, JSONRPC_VERSION, }; /// Represents an RPC server +#[derive(Default)] +pub struct ServerConfig { + codec_config: CodecConfig, +} + +/// Represents an RPC server pub struct Server<'a> { listener: Box<dyn Listener>, services: RwLock<HashMap<String, Box<dyn RPCService + 'a>>>, task_group: TaskGroup<'a>, + config: ServerConfig, } impl<'a> Server<'a> { /// Creates a new RPC server. - pub fn new(listener: Box<dyn Listener>, ex: Executor<'a>) -> Arc<Self> { + pub fn new(listener: Box<dyn Listener>, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> { Arc::new(Self { listener, services: RwLock::new(HashMap::new()), task_group: TaskGroup::new(ex), + config, }) } /// Creates a new RPC server using the provided endpoint. - pub async fn new_with_endpoint(endpoint: &Endpoint, ex: Executor<'a>) -> Result<Arc<Self>> { + pub async fn new_with_endpoint( + endpoint: &Endpoint, + config: ServerConfig, + ex: Executor<'a>, + ) -> Result<Arc<Self>> { let listener = listen(endpoint).await?; Ok(Arc::new(Self { listener, services: RwLock::new(HashMap::new()), task_group: TaskGroup::new(ex), + config, })) } @@ -77,15 +90,17 @@ impl<'a> Server<'a> { } }; + let codec = Codec::new(conn, self.config.codec_config.clone()); + let selfc = self.clone(); self.task_group.spawn( async move { loop { let mut buffer = vec![]; - read_until(&conn, &mut buffer).await?; + codec.read_until(&mut buffer).await?; let response = selfc.handle_request(&buffer).await; let payload = serde_json::to_vec(&response)?; - write_all(&conn, &payload).await?; + codec.write_all(&payload).await?; debug!("--> {response}"); } }, |