aboutsummaryrefslogtreecommitdiff
path: root/jsonrpc
diff options
context:
space:
mode:
Diffstat (limited to 'jsonrpc')
-rw-r--r--jsonrpc/Cargo.toml2
-rw-r--r--jsonrpc/impl/Cargo.toml (renamed from jsonrpc/jsonrpc_macro/Cargo.toml)0
-rw-r--r--jsonrpc/impl/src/lib.rs174
-rw-r--r--jsonrpc/jsonrpc_macro/src/lib.rs85
-rw-r--r--jsonrpc/src/client/mod.rs1
-rw-r--r--jsonrpc/src/server/channel.rs5
-rw-r--r--jsonrpc/src/server/mod.rs27
-rw-r--r--jsonrpc/src/server/pubsub_service.rs53
-rw-r--r--jsonrpc/src/server/service.rs50
-rw-r--r--jsonrpc/tests/impl_rpc_service.rs30
-rw-r--r--jsonrpc/tests/rpc_pubsub_impl.rs30
11 files changed, 226 insertions, 231 deletions
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml
index 41017fc..35053cd 100644
--- a/jsonrpc/Cargo.toml
+++ b/jsonrpc/Cargo.toml
@@ -33,7 +33,7 @@ tokio = [
karyon_core = { version = "0.1.5", path = "../core", default-features = false }
karyon_net = { version = "0.1.5", path = "../net", default-features = false }
-karyon_jsonrpc_macro = { version = "0.1.5", path = "jsonrpc_macro", default-features = false }
+karyon_jsonrpc_macro = { version = "0.1.5", path = "impl", default-features = false }
log = "0.4.21"
rand = "0.8.5"
diff --git a/jsonrpc/jsonrpc_macro/Cargo.toml b/jsonrpc/impl/Cargo.toml
index cf001e1..cf001e1 100644
--- a/jsonrpc/jsonrpc_macro/Cargo.toml
+++ b/jsonrpc/impl/Cargo.toml
diff --git a/jsonrpc/impl/src/lib.rs b/jsonrpc/impl/src/lib.rs
new file mode 100644
index 0000000..8814e61
--- /dev/null
+++ b/jsonrpc/impl/src/lib.rs
@@ -0,0 +1,174 @@
+use proc_macro::TokenStream;
+use proc_macro2::TokenStream as TokenStream2;
+use quote::quote;
+use syn::{
+ parse_macro_input, spanned::Spanned, FnArg, ImplItem, ItemImpl, ReturnType, Signature, Type,
+ TypePath,
+};
+
+macro_rules! err {
+ ($($tt:tt)*) => {
+ return syn::Error::new($($tt)*).to_compile_error().into()
+ };
+}
+
+#[proc_macro_attribute]
+pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let item2 = item.clone();
+ let parsed_input = parse_macro_input!(item2 as ItemImpl);
+
+ let self_ty = match *parsed_input.self_ty {
+ Type::Path(p) => p,
+ _ => err!(
+ parsed_input.span(),
+ "Implementing the trait `RPCService` on this type is unsupported"
+ ),
+ };
+
+ let methods = match parse_struct_methods(&self_ty, parsed_input.items) {
+ Ok(res) => res,
+ Err(err) => return err.to_compile_error().into(),
+ };
+
+ let mut method_idents = vec![];
+ for method in methods.iter() {
+ method_idents.push(method.ident.clone());
+ if method.inputs.len() != 2 {
+ err!(
+ method.span(),
+ "requires `&self` and a parameter of type `serde_json::Value`"
+ );
+ }
+
+ if let Err(err) = validate_method(method) {
+ return err.to_compile_error().into();
+ }
+ }
+
+ let impl_methods: Vec<TokenStream2> = method_idents.iter().map(
+ |m| quote! {
+ stringify!(#m) => Some(Box::new(move |params: serde_json::Value| Box::pin(self.#m(params)))),
+ },
+ ).collect();
+
+ let item: TokenStream2 = item.into();
+ quote! {
+ impl karyon_jsonrpc::RPCService for #self_ty {
+ fn get_method<'a>(
+ &'a self,
+ name: &'a str
+ ) -> Option<karyon_jsonrpc::RPCMethod> {
+ match name {
+ #(#impl_methods)*
+ _ => None,
+ }
+ }
+ fn name(&self) -> String{
+ stringify!(#self_ty).to_string()
+ }
+ }
+ #item
+ }
+ .into()
+}
+
+#[proc_macro_attribute]
+pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let item2 = item.clone();
+ let parsed_input = parse_macro_input!(item2 as ItemImpl);
+
+ let self_ty = match *parsed_input.self_ty {
+ Type::Path(p) => p,
+ _ => err!(
+ parsed_input.span(),
+ "Implementing the trait `PubSubRPCService` on this type is unsupported"
+ ),
+ };
+
+ let methods = match parse_struct_methods(&self_ty, parsed_input.items) {
+ Ok(res) => res,
+ Err(err) => return err.to_compile_error().into(),
+ };
+
+ let mut method_idents = vec![];
+ for method in methods.iter() {
+ method_idents.push(method.ident.clone());
+ if method.inputs.len() != 4 {
+ err!(method.span(), "requires `&self` and three parameters: `Arc<Channel>`, method: `String`, and `serde_json::Value`");
+ }
+ if let Err(err) = validate_method(method) {
+ return err.to_compile_error().into();
+ }
+ }
+
+ let impl_methods: Vec<TokenStream2> = method_idents.iter().map(
+ |m| quote! {
+ stringify!(#m) => {
+ Some(Box::new(
+ move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
+ Box::pin(self.#m(chan, method, params))
+ }))
+ },
+ },
+ ).collect();
+
+ let item: TokenStream2 = item.into();
+ quote! {
+ impl karyon_jsonrpc::PubSubRPCService for #self_ty {
+ fn get_pubsub_method<'a>(
+ &'a self,
+ name: &'a str
+ ) -> Option<karyon_jsonrpc::PubSubRPCMethod> {
+ match name {
+ #(#impl_methods)*
+ _ => None,
+ }
+ }
+
+ fn name(&self) -> String{
+ stringify!(#self_ty).to_string()
+ }
+ }
+ #item
+ }
+ .into()
+}
+
+fn parse_struct_methods(
+ self_ty: &TypePath,
+ items: Vec<ImplItem>,
+) -> Result<Vec<Signature>, syn::Error> {
+ let mut methods: Vec<Signature> = vec![];
+
+ if items.is_empty() {
+ return Err(syn::Error::new(
+ self_ty.span(),
+ "At least one method should be implemented",
+ ));
+ }
+
+ for item in items {
+ match item {
+ ImplItem::Fn(method) => {
+ methods.push(method.sig);
+ }
+ _ => return Err(syn::Error::new(item.span(), "Unexpected item!")),
+ }
+ }
+
+ Ok(methods)
+}
+
+fn validate_method(method: &Signature) -> Result<(), syn::Error> {
+ if let FnArg::Typed(_) = method.inputs[0] {
+ return Err(syn::Error::new(method.span(), "requires `&self` parameter"));
+ }
+
+ if let ReturnType::Default = method.output {
+ return Err(syn::Error::new(
+ method.span(),
+ "requires `Result<serde_json::Value, RPCError>` as return type",
+ ));
+ }
+ Ok(())
+}
diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs
deleted file mode 100644
index ecca5b1..0000000
--- a/jsonrpc/jsonrpc_macro/src/lib.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use proc_macro::TokenStream;
-use proc_macro2::{Ident, TokenStream as TokenStream2};
-use quote::quote;
-use syn::{parse_macro_input, spanned::Spanned, ImplItem, ItemImpl, Type};
-
-macro_rules! err {
- ($($tt:tt)*) => {
- return syn::Error::new($($tt)*).to_compile_error().into()
- };
-}
-
-#[proc_macro_attribute]
-pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
- let mut methods: Vec<Ident> = vec![];
-
- let item2 = item.clone();
- let parsed_input = parse_macro_input!(item2 as ItemImpl);
-
- let self_ty = match *parsed_input.self_ty {
- Type::Path(p) => p,
- _ => err!(
- parsed_input.span(),
- "implementing the trait `RPCService` on this type is unsupported"
- ),
- };
-
- if parsed_input.items.is_empty() {
- err!(self_ty.span(), "At least one method should be implemented");
- }
-
- for item in parsed_input.items {
- match item {
- ImplItem::Fn(method) => {
- methods.push(method.sig.ident);
- }
- _ => err!(item.span(), "unexpected item"),
- }
- }
-
- let item2: TokenStream2 = item.into();
- let quoted = quote! {
- karyon_jsonrpc::impl_rpc_service!(#self_ty, #(#methods),*);
- #item2
- };
-
- quoted.into()
-}
-
-// TODO remove duplicate code
-#[proc_macro_attribute]
-pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
- let mut methods: Vec<Ident> = vec![];
-
- let item2 = item.clone();
- let parsed_input = parse_macro_input!(item2 as ItemImpl);
-
- let self_ty = match *parsed_input.self_ty {
- Type::Path(p) => p,
- _ => err!(
- parsed_input.span(),
- "implementing the trait `RPCService` on this type is unsupported"
- ),
- };
-
- if parsed_input.items.is_empty() {
- err!(self_ty.span(), "At least one method should be implemented");
- }
-
- for item in parsed_input.items {
- match item {
- ImplItem::Fn(method) => {
- methods.push(method.sig.ident);
- }
- _ => err!(item.span(), "unexpected item"),
- }
- }
-
- let item2: TokenStream2 = item.into();
- let quoted = quote! {
- karyon_jsonrpc::impl_pubsub_rpc_service!(#self_ty, #(#methods),*);
- #item2
- };
-
- quoted.into()
-}
diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs
index b54656b..360a094 100644
--- a/jsonrpc/src/client/mod.rs
+++ b/jsonrpc/src/client/mod.rs
@@ -180,6 +180,7 @@ impl Client {
Ok(())
}
+ /// Initializes a new [`Client`] from the provided [`ClientConfig`].
async fn init(config: ClientConfig) -> Result<Arc<Self>> {
let client = Arc::new(Client {
disconnect: AtomicBool::new(false),
diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs
index 36896b4..7eff66f 100644
--- a/jsonrpc/src/server/channel.rs
+++ b/jsonrpc/src/server/channel.rs
@@ -52,7 +52,7 @@ impl Subscription {
}
/// Checks from the partent if this subscription is still subscribed
- pub async fn still_subscribed(&self) -> bool {
+ async fn still_subscribed(&self) -> bool {
match self.parent.upgrade() {
Some(parent) => parent.subs.lock().await.contains(&self.id),
None => false,
@@ -93,7 +93,8 @@ impl Channel {
subs.remove(i);
}
- pub fn close(&self) {
+ /// Closes the [`Channel`]
+ pub(crate) fn close(&self) {
self.chan.close();
}
}
diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs
index e2c3e3a..50a4a8e 100644
--- a/jsonrpc/src/server/mod.rs
+++ b/jsonrpc/src/server/mod.rs
@@ -64,20 +64,21 @@ pub struct Server {
}
impl Server {
- /// Returns the local endpoint.
- pub fn local_endpoint(&self) -> Result<Endpoint> {
- self.listener.local_endpoint().map_err(Error::from)
- }
-
- /// Starts the RPC server
+ /// Starts the RPC server. This will spawn a new task for the main accept loop,
+ /// which listens for incoming connections.
pub fn start(self: &Arc<Self>) {
- let on_complete = |result: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Accept loop stopped: {err}");
+ // Handle the completion of the accept loop task
+ let on_complete = {
+ let this = self.clone();
+ |result: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = result {
+ error!("Main accept loop stopped: {err}");
+ this.shutdown().await;
+ }
}
};
- // Spawns a new task for each new incoming connection
+ // Spawns a new task for the main accept loop
self.task_group.spawn(
{
let this = self.clone();
@@ -100,6 +101,11 @@ impl Server {
);
}
+ /// Returns the local endpoint.
+ pub fn local_endpoint(&self) -> Result<Endpoint> {
+ self.listener.local_endpoint().map_err(Error::from)
+ }
+
/// Shuts down the RPC server
pub async fn shutdown(&self) {
self.task_group.cancel().await;
@@ -335,6 +341,7 @@ impl Server {
response
}
+ /// Initializes a new [`Server`] from the provided [`ServerConfig`]
async fn init(config: ServerConfig, ex: Option<Executor>) -> Result<Arc<Self>> {
let task_group = match ex {
Some(ex) => TaskGroup::with_executor(ex),
diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs
index a6b4c11..909b0b0 100644
--- a/jsonrpc/src/server/pubsub_service.rs
+++ b/jsonrpc/src/server/pubsub_service.rs
@@ -15,56 +15,3 @@ pub trait PubSubRPCService: Sync + Send {
fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option<PubSubRPCMethod>;
fn name(&self) -> String;
}
-
-/// Implements the [`PubSubRPCService`] trait for a provided type.
-///
-/// # Example
-///
-/// ```
-/// use serde_json::Value;
-///
-/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
-///
-/// struct Hello {}
-///
-/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("foo!"))
-/// }
-///
-/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("bar!"))
-/// }
-/// }
-///
-/// impl_rpc_service!(Hello, foo, bar);
-///
-/// ```
-#[macro_export]
-macro_rules! impl_pubsub_rpc_service {
- ($t:ty, $($m:ident),*) => {
- impl karyon_jsonrpc::PubSubRPCService for $t {
- fn get_pubsub_method<'a>(
- &'a self,
- name: &'a str
- ) -> Option<karyon_jsonrpc::PubSubRPCMethod> {
- match name {
- $(
- stringify!($m) => {
- Some(Box::new(
- move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| {
- Box::pin(self.$m(chan, method, params))
- }))
- }
- )*
- _ => None,
- }
-
-
- }
- fn name(&self) -> String{
- stringify!($t).to_string()
- }
- }
- };
-}
diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs
index 9cc1d21..787da86 100644
--- a/jsonrpc/src/server/service.rs
+++ b/jsonrpc/src/server/service.rs
@@ -12,53 +12,3 @@ pub trait RPCService: Sync + Send {
fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>;
fn name(&self) -> String;
}
-
-/// Implements the [`RPCService`] trait for a provided type.
-///
-/// # Example
-///
-/// ```
-/// use serde_json::Value;
-///
-/// use karyon_jsonrpc::{RPCError, impl_rpc_service};
-///
-/// struct Hello {}
-///
-/// impl Hello {
-/// async fn foo(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("foo!"))
-/// }
-///
-/// async fn bar(&self, params: Value) -> Result<Value, RPCError> {
-/// Ok(serde_json::json!("bar!"))
-/// }
-/// }
-///
-/// impl_rpc_service!(Hello, foo, bar);
-///
-/// ```
-#[macro_export]
-macro_rules! impl_rpc_service {
- ($t:ty, $($m:ident),*) => {
- impl karyon_jsonrpc::RPCService for $t {
- fn get_method<'a>(
- &'a self,
- name: &'a str
- ) -> Option<karyon_jsonrpc::RPCMethod> {
- match name {
- $(
- stringify!($m) => {
- Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params))))
- }
- )*
- _ => None,
- }
-
-
- }
- fn name(&self) -> String{
- stringify!($t).to_string()
- }
- }
- };
-}
diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs
deleted file mode 100644
index 52c5c31..0000000
--- a/jsonrpc/tests/impl_rpc_service.rs
+++ /dev/null
@@ -1,30 +0,0 @@
-use karyon_jsonrpc::{impl_rpc_service, RPCError, RPCService};
-use serde_json::Value;
-
-#[test]
-fn service() {
- struct Foo {}
-
- impl Foo {
- async fn foo(&self, params: Value) -> Result<Value, RPCError> {
- Ok(params)
- }
- }
-
- impl_rpc_service!(Foo, foo);
-
- let f = Foo {};
-
- assert!(f.get_method("foo").is_some());
- assert!(f.get_method("bar").is_none());
-
- let params = serde_json::json!("params");
-
- smol::block_on(async {
- let foo_method = f.get_method("foo").expect("Get method foo");
- assert_eq!(
- foo_method(params.clone()).await.expect("Call foo method"),
- params
- );
- });
-}
diff --git a/jsonrpc/tests/rpc_pubsub_impl.rs b/jsonrpc/tests/rpc_pubsub_impl.rs
new file mode 100644
index 0000000..9d2eb57
--- /dev/null
+++ b/jsonrpc/tests/rpc_pubsub_impl.rs
@@ -0,0 +1,30 @@
+use std::sync::Arc;
+
+use karyon_jsonrpc::{rpc_pubsub_impl, Channel, PubSubRPCService, RPCError};
+use serde_json::Value;
+
+#[test]
+fn rpc_pubsub_impl_service() {
+ struct Foo {}
+
+ #[rpc_pubsub_impl]
+ impl Foo {
+ async fn foo(
+ &self,
+ _channel: Arc<Channel>,
+ _method: String,
+ params: Value,
+ ) -> Result<Value, RPCError> {
+ Ok(params)
+ }
+ }
+
+ let f = Arc::new(Foo {});
+
+ assert!(f.get_pubsub_method("foo").is_some());
+ assert!(f.get_pubsub_method("bar").is_none());
+
+ let _params = serde_json::json!("params");
+
+ // TODO add more tests here
+}