diff options
Diffstat (limited to 'jsonrpc')
-rw-r--r-- | jsonrpc/Cargo.toml | 2 | ||||
-rw-r--r-- | jsonrpc/impl/Cargo.toml (renamed from jsonrpc/jsonrpc_macro/Cargo.toml) | 0 | ||||
-rw-r--r-- | jsonrpc/impl/src/lib.rs | 174 | ||||
-rw-r--r-- | jsonrpc/jsonrpc_macro/src/lib.rs | 85 | ||||
-rw-r--r-- | jsonrpc/src/client/mod.rs | 1 | ||||
-rw-r--r-- | jsonrpc/src/server/channel.rs | 5 | ||||
-rw-r--r-- | jsonrpc/src/server/mod.rs | 27 | ||||
-rw-r--r-- | jsonrpc/src/server/pubsub_service.rs | 53 | ||||
-rw-r--r-- | jsonrpc/src/server/service.rs | 50 | ||||
-rw-r--r-- | jsonrpc/tests/impl_rpc_service.rs | 30 | ||||
-rw-r--r-- | jsonrpc/tests/rpc_pubsub_impl.rs | 30 |
11 files changed, 226 insertions, 231 deletions
diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 41017fc..35053cd 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -33,7 +33,7 @@ tokio = [ karyon_core = { version = "0.1.5", path = "../core", default-features = false } karyon_net = { version = "0.1.5", path = "../net", default-features = false } -karyon_jsonrpc_macro = { version = "0.1.5", path = "jsonrpc_macro", default-features = false } +karyon_jsonrpc_macro = { version = "0.1.5", path = "impl", default-features = false } log = "0.4.21" rand = "0.8.5" diff --git a/jsonrpc/jsonrpc_macro/Cargo.toml b/jsonrpc/impl/Cargo.toml index cf001e1..cf001e1 100644 --- a/jsonrpc/jsonrpc_macro/Cargo.toml +++ b/jsonrpc/impl/Cargo.toml diff --git a/jsonrpc/impl/src/lib.rs b/jsonrpc/impl/src/lib.rs new file mode 100644 index 0000000..8814e61 --- /dev/null +++ b/jsonrpc/impl/src/lib.rs @@ -0,0 +1,174 @@ +use proc_macro::TokenStream; +use proc_macro2::TokenStream as TokenStream2; +use quote::quote; +use syn::{ + parse_macro_input, spanned::Spanned, FnArg, ImplItem, ItemImpl, ReturnType, Signature, Type, + TypePath, +}; + +macro_rules! err { + ($($tt:tt)*) => { + return syn::Error::new($($tt)*).to_compile_error().into() + }; +} + +#[proc_macro_attribute] +pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let item2 = item.clone(); + let parsed_input = parse_macro_input!(item2 as ItemImpl); + + let self_ty = match *parsed_input.self_ty { + Type::Path(p) => p, + _ => err!( + parsed_input.span(), + "Implementing the trait `RPCService` on this type is unsupported" + ), + }; + + let methods = match parse_struct_methods(&self_ty, parsed_input.items) { + Ok(res) => res, + Err(err) => return err.to_compile_error().into(), + }; + + let mut method_idents = vec![]; + for method in methods.iter() { + method_idents.push(method.ident.clone()); + if method.inputs.len() != 2 { + err!( + method.span(), + "requires `&self` and a parameter of type `serde_json::Value`" + ); + } + + if let Err(err) = validate_method(method) { + return err.to_compile_error().into(); + } + } + + let impl_methods: Vec<TokenStream2> = method_idents.iter().map( + |m| quote! { + stringify!(#m) => Some(Box::new(move |params: serde_json::Value| Box::pin(self.#m(params)))), + }, + ).collect(); + + let item: TokenStream2 = item.into(); + quote! { + impl karyon_jsonrpc::RPCService for #self_ty { + fn get_method<'a>( + &'a self, + name: &'a str + ) -> Option<karyon_jsonrpc::RPCMethod> { + match name { + #(#impl_methods)* + _ => None, + } + } + fn name(&self) -> String{ + stringify!(#self_ty).to_string() + } + } + #item + } + .into() +} + +#[proc_macro_attribute] +pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let item2 = item.clone(); + let parsed_input = parse_macro_input!(item2 as ItemImpl); + + let self_ty = match *parsed_input.self_ty { + Type::Path(p) => p, + _ => err!( + parsed_input.span(), + "Implementing the trait `PubSubRPCService` on this type is unsupported" + ), + }; + + let methods = match parse_struct_methods(&self_ty, parsed_input.items) { + Ok(res) => res, + Err(err) => return err.to_compile_error().into(), + }; + + let mut method_idents = vec![]; + for method in methods.iter() { + method_idents.push(method.ident.clone()); + if method.inputs.len() != 4 { + err!(method.span(), "requires `&self` and three parameters: `Arc<Channel>`, method: `String`, and `serde_json::Value`"); + } + if let Err(err) = validate_method(method) { + return err.to_compile_error().into(); + } + } + + let impl_methods: Vec<TokenStream2> = method_idents.iter().map( + |m| quote! { + stringify!(#m) => { + Some(Box::new( + move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| { + Box::pin(self.#m(chan, method, params)) + })) + }, + }, + ).collect(); + + let item: TokenStream2 = item.into(); + quote! { + impl karyon_jsonrpc::PubSubRPCService for #self_ty { + fn get_pubsub_method<'a>( + &'a self, + name: &'a str + ) -> Option<karyon_jsonrpc::PubSubRPCMethod> { + match name { + #(#impl_methods)* + _ => None, + } + } + + fn name(&self) -> String{ + stringify!(#self_ty).to_string() + } + } + #item + } + .into() +} + +fn parse_struct_methods( + self_ty: &TypePath, + items: Vec<ImplItem>, +) -> Result<Vec<Signature>, syn::Error> { + let mut methods: Vec<Signature> = vec![]; + + if items.is_empty() { + return Err(syn::Error::new( + self_ty.span(), + "At least one method should be implemented", + )); + } + + for item in items { + match item { + ImplItem::Fn(method) => { + methods.push(method.sig); + } + _ => return Err(syn::Error::new(item.span(), "Unexpected item!")), + } + } + + Ok(methods) +} + +fn validate_method(method: &Signature) -> Result<(), syn::Error> { + if let FnArg::Typed(_) = method.inputs[0] { + return Err(syn::Error::new(method.span(), "requires `&self` parameter")); + } + + if let ReturnType::Default = method.output { + return Err(syn::Error::new( + method.span(), + "requires `Result<serde_json::Value, RPCError>` as return type", + )); + } + Ok(()) +} diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs deleted file mode 100644 index ecca5b1..0000000 --- a/jsonrpc/jsonrpc_macro/src/lib.rs +++ /dev/null @@ -1,85 +0,0 @@ -use proc_macro::TokenStream; -use proc_macro2::{Ident, TokenStream as TokenStream2}; -use quote::quote; -use syn::{parse_macro_input, spanned::Spanned, ImplItem, ItemImpl, Type}; - -macro_rules! err { - ($($tt:tt)*) => { - return syn::Error::new($($tt)*).to_compile_error().into() - }; -} - -#[proc_macro_attribute] -pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { - let mut methods: Vec<Ident> = vec![]; - - let item2 = item.clone(); - let parsed_input = parse_macro_input!(item2 as ItemImpl); - - let self_ty = match *parsed_input.self_ty { - Type::Path(p) => p, - _ => err!( - parsed_input.span(), - "implementing the trait `RPCService` on this type is unsupported" - ), - }; - - if parsed_input.items.is_empty() { - err!(self_ty.span(), "At least one method should be implemented"); - } - - for item in parsed_input.items { - match item { - ImplItem::Fn(method) => { - methods.push(method.sig.ident); - } - _ => err!(item.span(), "unexpected item"), - } - } - - let item2: TokenStream2 = item.into(); - let quoted = quote! { - karyon_jsonrpc::impl_rpc_service!(#self_ty, #(#methods),*); - #item2 - }; - - quoted.into() -} - -// TODO remove duplicate code -#[proc_macro_attribute] -pub fn rpc_pubsub_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { - let mut methods: Vec<Ident> = vec![]; - - let item2 = item.clone(); - let parsed_input = parse_macro_input!(item2 as ItemImpl); - - let self_ty = match *parsed_input.self_ty { - Type::Path(p) => p, - _ => err!( - parsed_input.span(), - "implementing the trait `RPCService` on this type is unsupported" - ), - }; - - if parsed_input.items.is_empty() { - err!(self_ty.span(), "At least one method should be implemented"); - } - - for item in parsed_input.items { - match item { - ImplItem::Fn(method) => { - methods.push(method.sig.ident); - } - _ => err!(item.span(), "unexpected item"), - } - } - - let item2: TokenStream2 = item.into(); - let quoted = quote! { - karyon_jsonrpc::impl_pubsub_rpc_service!(#self_ty, #(#methods),*); - #item2 - }; - - quoted.into() -} diff --git a/jsonrpc/src/client/mod.rs b/jsonrpc/src/client/mod.rs index b54656b..360a094 100644 --- a/jsonrpc/src/client/mod.rs +++ b/jsonrpc/src/client/mod.rs @@ -180,6 +180,7 @@ impl Client { Ok(()) } + /// Initializes a new [`Client`] from the provided [`ClientConfig`]. async fn init(config: ClientConfig) -> Result<Arc<Self>> { let client = Arc::new(Client { disconnect: AtomicBool::new(false), diff --git a/jsonrpc/src/server/channel.rs b/jsonrpc/src/server/channel.rs index 36896b4..7eff66f 100644 --- a/jsonrpc/src/server/channel.rs +++ b/jsonrpc/src/server/channel.rs @@ -52,7 +52,7 @@ impl Subscription { } /// Checks from the partent if this subscription is still subscribed - pub async fn still_subscribed(&self) -> bool { + async fn still_subscribed(&self) -> bool { match self.parent.upgrade() { Some(parent) => parent.subs.lock().await.contains(&self.id), None => false, @@ -93,7 +93,8 @@ impl Channel { subs.remove(i); } - pub fn close(&self) { + /// Closes the [`Channel`] + pub(crate) fn close(&self) { self.chan.close(); } } diff --git a/jsonrpc/src/server/mod.rs b/jsonrpc/src/server/mod.rs index e2c3e3a..50a4a8e 100644 --- a/jsonrpc/src/server/mod.rs +++ b/jsonrpc/src/server/mod.rs @@ -64,20 +64,21 @@ pub struct Server { } impl Server { - /// Returns the local endpoint. - pub fn local_endpoint(&self) -> Result<Endpoint> { - self.listener.local_endpoint().map_err(Error::from) - } - - /// Starts the RPC server + /// Starts the RPC server. This will spawn a new task for the main accept loop, + /// which listens for incoming connections. pub fn start(self: &Arc<Self>) { - let on_complete = |result: TaskResult<Result<()>>| async move { - if let TaskResult::Completed(Err(err)) = result { - error!("Accept loop stopped: {err}"); + // Handle the completion of the accept loop task + let on_complete = { + let this = self.clone(); + |result: TaskResult<Result<()>>| async move { + if let TaskResult::Completed(Err(err)) = result { + error!("Main accept loop stopped: {err}"); + this.shutdown().await; + } } }; - // Spawns a new task for each new incoming connection + // Spawns a new task for the main accept loop self.task_group.spawn( { let this = self.clone(); @@ -100,6 +101,11 @@ impl Server { ); } + /// Returns the local endpoint. + pub fn local_endpoint(&self) -> Result<Endpoint> { + self.listener.local_endpoint().map_err(Error::from) + } + /// Shuts down the RPC server pub async fn shutdown(&self) { self.task_group.cancel().await; @@ -335,6 +341,7 @@ impl Server { response } + /// Initializes a new [`Server`] from the provided [`ServerConfig`] async fn init(config: ServerConfig, ex: Option<Executor>) -> Result<Arc<Self>> { let task_group = match ex { Some(ex) => TaskGroup::with_executor(ex), diff --git a/jsonrpc/src/server/pubsub_service.rs b/jsonrpc/src/server/pubsub_service.rs index a6b4c11..909b0b0 100644 --- a/jsonrpc/src/server/pubsub_service.rs +++ b/jsonrpc/src/server/pubsub_service.rs @@ -15,56 +15,3 @@ pub trait PubSubRPCService: Sync + Send { fn get_pubsub_method<'a>(&'a self, name: &'a str) -> Option<PubSubRPCMethod>; fn name(&self) -> String; } - -/// Implements the [`PubSubRPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, RPCError> { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result<Value, RPCError> { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// impl_rpc_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! impl_pubsub_rpc_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::PubSubRPCService for $t { - fn get_pubsub_method<'a>( - &'a self, - name: &'a str - ) -> Option<karyon_jsonrpc::PubSubRPCMethod> { - match name { - $( - stringify!($m) => { - Some(Box::new( - move |chan: std::sync::Arc<karyon_jsonrpc::Channel>, method: String, params: serde_json::Value| { - Box::pin(self.$m(chan, method, params)) - })) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} diff --git a/jsonrpc/src/server/service.rs b/jsonrpc/src/server/service.rs index 9cc1d21..787da86 100644 --- a/jsonrpc/src/server/service.rs +++ b/jsonrpc/src/server/service.rs @@ -12,53 +12,3 @@ pub trait RPCService: Sync + Send { fn get_method<'a>(&'a self, name: &'a str) -> Option<RPCMethod>; fn name(&self) -> String; } - -/// Implements the [`RPCService`] trait for a provided type. -/// -/// # Example -/// -/// ``` -/// use serde_json::Value; -/// -/// use karyon_jsonrpc::{RPCError, impl_rpc_service}; -/// -/// struct Hello {} -/// -/// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, RPCError> { -/// Ok(serde_json::json!("foo!")) -/// } -/// -/// async fn bar(&self, params: Value) -> Result<Value, RPCError> { -/// Ok(serde_json::json!("bar!")) -/// } -/// } -/// -/// impl_rpc_service!(Hello, foo, bar); -/// -/// ``` -#[macro_export] -macro_rules! impl_rpc_service { - ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { - fn get_method<'a>( - &'a self, - name: &'a str - ) -> Option<karyon_jsonrpc::RPCMethod> { - match name { - $( - stringify!($m) => { - Some(Box::new(move |params: serde_json::Value| Box::pin(self.$m(params)))) - } - )* - _ => None, - } - - - } - fn name(&self) -> String{ - stringify!($t).to_string() - } - } - }; -} diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs deleted file mode 100644 index 52c5c31..0000000 --- a/jsonrpc/tests/impl_rpc_service.rs +++ /dev/null @@ -1,30 +0,0 @@ -use karyon_jsonrpc::{impl_rpc_service, RPCError, RPCService}; -use serde_json::Value; - -#[test] -fn service() { - struct Foo {} - - impl Foo { - async fn foo(&self, params: Value) -> Result<Value, RPCError> { - Ok(params) - } - } - - impl_rpc_service!(Foo, foo); - - let f = Foo {}; - - assert!(f.get_method("foo").is_some()); - assert!(f.get_method("bar").is_none()); - - let params = serde_json::json!("params"); - - smol::block_on(async { - let foo_method = f.get_method("foo").expect("Get method foo"); - assert_eq!( - foo_method(params.clone()).await.expect("Call foo method"), - params - ); - }); -} diff --git a/jsonrpc/tests/rpc_pubsub_impl.rs b/jsonrpc/tests/rpc_pubsub_impl.rs new file mode 100644 index 0000000..9d2eb57 --- /dev/null +++ b/jsonrpc/tests/rpc_pubsub_impl.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use karyon_jsonrpc::{rpc_pubsub_impl, Channel, PubSubRPCService, RPCError}; +use serde_json::Value; + +#[test] +fn rpc_pubsub_impl_service() { + struct Foo {} + + #[rpc_pubsub_impl] + impl Foo { + async fn foo( + &self, + _channel: Arc<Channel>, + _method: String, + params: Value, + ) -> Result<Value, RPCError> { + Ok(params) + } + } + + let f = Arc::new(Foo {}); + + assert!(f.get_pubsub_method("foo").is_some()); + assert!(f.get_pubsub_method("bar").is_none()); + + let _params = serde_json::json!("params"); + + // TODO add more tests here +} |