aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc/src/server.rs')
-rw-r--r--jsonrpc/src/server.rs25
1 files changed, 20 insertions, 5 deletions
diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs
index 6c01a96..133f261 100644
--- a/jsonrpc/src/server.rs
+++ b/jsonrpc/src/server.rs
@@ -10,36 +10,49 @@ use karyons_core::{
use karyons_net::{listen, Conn, Endpoint, Listener};
use crate::{
+ codec::{Codec, CodecConfig},
message,
service::RPCService,
- utils::{read_until, write_all},
Error, Result, JSONRPC_VERSION,
};
/// Represents an RPC server
+#[derive(Default)]
+pub struct ServerConfig {
+ codec_config: CodecConfig,
+}
+
+/// Represents an RPC server
pub struct Server<'a> {
listener: Box<dyn Listener>,
services: RwLock<HashMap<String, Box<dyn RPCService + 'a>>>,
task_group: TaskGroup<'a>,
+ config: ServerConfig,
}
impl<'a> Server<'a> {
/// Creates a new RPC server.
- pub fn new(listener: Box<dyn Listener>, ex: Executor<'a>) -> Arc<Self> {
+ pub fn new(listener: Box<dyn Listener>, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> {
Arc::new(Self {
listener,
services: RwLock::new(HashMap::new()),
task_group: TaskGroup::new(ex),
+ config,
})
}
/// Creates a new RPC server using the provided endpoint.
- pub async fn new_with_endpoint(endpoint: &Endpoint, ex: Executor<'a>) -> Result<Arc<Self>> {
+ pub async fn new_with_endpoint(
+ endpoint: &Endpoint,
+ config: ServerConfig,
+ ex: Executor<'a>,
+ ) -> Result<Arc<Self>> {
let listener = listen(endpoint).await?;
Ok(Arc::new(Self {
listener,
services: RwLock::new(HashMap::new()),
task_group: TaskGroup::new(ex),
+ config,
}))
}
@@ -77,15 +90,17 @@ impl<'a> Server<'a> {
}
};
+ let codec = Codec::new(conn, self.config.codec_config.clone());
+
let selfc = self.clone();
self.task_group.spawn(
async move {
loop {
let mut buffer = vec![];
- read_until(&conn, &mut buffer).await?;
+ codec.read_until(&mut buffer).await?;
let response = selfc.handle_request(&buffer).await;
let payload = serde_json::to_vec(&response)?;
- write_all(&conn, &payload).await?;
+ codec.write_all(&payload).await?;
debug!("--> {response}");
}
},