diff options
author | hozan23 <hozan23@karyontech.net> | 2024-04-11 10:19:20 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-05-19 13:51:30 +0200 |
commit | 0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch) | |
tree | 961d73218af672797d49f899289bef295bc56493 | |
parent | a69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff) |
add support for tokio & improve net crate api
87 files changed, 3339 insertions, 1567 deletions
@@ -3,10 +3,25 @@ version = 3 [[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -28,47 +43,48 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", + "is_terminal_polyfill", "utf8parse", ] [[package]] name = "anstyle" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" [[package]] name = "anstyle-parse" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -98,7 +114,7 @@ checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", "synstructure", ] @@ -110,7 +126,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] @@ -126,40 +142,39 @@ dependencies = [ [[package]] name = "async-channel" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +checksum = "9f2776ead772134d55b62dd45e59a79e21612d85d0af729b8b7d3967d601a62a" dependencies = [ "concurrent-queue", - "event-listener 5.2.0", - "event-listener-strategy 0.5.0", + "event-listener 5.3.0", + "event-listener-strategy 0.5.2", "futures-core", "pin-project-lite", ] [[package]] name = "async-executor" -version = "1.8.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +checksum = "b10202063978b3351199d68f8b22c4e47e4b1b822f8d43fd862d5ea8c006b29a" dependencies = [ - "async-lock 3.3.0", "async-task", "concurrent-queue", - "fastrand 2.0.1", - "futures-lite 2.2.0", + "fastrand 2.1.0", + "futures-lite 2.3.0", "slab", ] [[package]] name = "async-fs" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc19683171f287921f2405677dd2ed2549c3b3bda697a563ebc3a121ace2aba1" +checksum = "ebcd09b382f40fcd159c2d695175b2ae620ffa5f3bd6f664131efff4e8b9e04a" dependencies = [ "async-lock 3.3.0", "blocking", - "futures-lite 2.2.0", + "futures-lite 2.3.0", ] [[package]] @@ -168,12 +183,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ - "async-channel 2.2.0", + "async-channel 2.3.0", "async-executor", "async-io 2.3.2", "async-lock 3.3.0", "blocking", - "futures-lite 2.2.0", + "futures-lite 2.3.0", "once_cell", ] @@ -193,7 +208,7 @@ dependencies = [ "polling 2.8.0", "rustix 0.37.27", "slab", - "socket2", + "socket2 0.4.10", "waker-fn", ] @@ -207,10 +222,10 @@ dependencies = [ "cfg-if", "concurrent-queue", "futures-io", - "futures-lite 2.2.0", + "futures-lite 2.3.0", "parking", - "polling 3.5.0", - "rustix 0.38.31", + "polling 3.7.0", + "rustix 0.38.34", "slab", "tracing", "windows-sys 0.52.0", @@ -244,43 +259,45 @@ checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" dependencies = [ "async-io 2.3.2", "blocking", - "futures-lite 2.2.0", + "futures-lite 2.3.0", ] [[package]] name = "async-process" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "451e3cf68011bd56771c79db04a9e333095ab6349f7e47592b788e9b98720cc8" +checksum = "a53fc6301894e04a92cb2584fedde80cb25ba8e02d9dc39d4a87d036e22f397d" dependencies = [ - "async-channel 2.2.0", + "async-channel 2.3.0", "async-io 2.3.2", "async-lock 3.3.0", "async-signal", + "async-task", "blocking", "cfg-if", - "event-listener 5.2.0", - "futures-lite 2.2.0", - "rustix 0.38.31", + "event-listener 5.3.0", + "futures-lite 2.3.0", + "rustix 0.38.34", + "tracing", "windows-sys 0.52.0", ] [[package]] name = "async-signal" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" +checksum = "afe66191c335039c7bb78f99dc7520b0cbb166b3a1cb33a03f53d8a1c6f2afda" dependencies = [ "async-io 2.3.2", - "async-lock 2.8.0", + "async-lock 3.3.0", "atomic-waker", "cfg-if", "futures-core", "futures-io", - "rustix 0.38.31", + "rustix 0.38.34", "signal-hook-registry", "slab", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -311,43 +328,47 @@ dependencies = [ [[package]] name = "async-task" -version = "4.7.0" +version = "4.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] name = "async-tungstenite" -version = "0.25.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef0f8d64ef9351752fbe5462f242c625d9c4910d2bc3f7ec44c43857ca123f5d" +checksum = "2cca750b12e02c389c1694d35c16539f88b8bbaa5945934fdc1b41a776688589" dependencies = [ + "async-std", "futures-io", "futures-util", "log", "pin-project-lite", + "tokio", "tungstenite", ] [[package]] -name = "async_io_stream" -version = "0.3.3" +name = "asynchronous-codec" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" dependencies = [ - "futures", - "pharos", - "rustc_version", + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", ] [[package]] @@ -358,15 +379,15 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.1.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-lc-rs" -version = "1.6.2" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33e4a55b03f8780ba55041bc7be91a2a8ec8c03517b0379d2d6c96d2c30d95" +checksum = "8487b59d62764df8231cb371c459314df895b41756df457a1fb1243d65c89195" dependencies = [ "aws-lc-sys", "mirai-annotations", @@ -377,11 +398,12 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.13.3" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ede3d6e360a48436fee127cb81710834407b1ec0c48a001cc29dec9005f73e" +checksum = "c15eb61145320320eb919d9bab524617a7aa4216c78d342fae3a758bc33073e4" dependencies = [ "bindgen", + "cc", "cmake", "dunce", "fs_extra", @@ -390,10 +412,25 @@ dependencies = [ ] [[package]] +name = "backtrace" +version = "0.3.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] name = "base64" -version = "0.21.7" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" @@ -426,7 +463,7 @@ version = "0.69.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cexpr", "clang-sys", "itertools", @@ -439,7 +476,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn", + "syn 2.0.63", "which", ] @@ -451,9 +488,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "block-buffer" @@ -466,25 +503,23 @@ dependencies = [ [[package]] name = "blocking" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +checksum = "495f7104e962b7356f0aeb34247aca1fe7d2e783b346582db7f2904cb5717e88" dependencies = [ - "async-channel 2.2.0", + "async-channel 2.3.0", "async-lock 3.3.0", "async-task", - "fastrand 2.0.1", "futures-io", - "futures-lite 2.2.0", + "futures-lite 2.3.0", "piper", - "tracing", ] [[package]] name = "bumpalo" -version = "3.15.4" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "byteorder" @@ -494,15 +529,20 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cc" -version = "1.0.90" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" +dependencies = [ + "jobserver", + "libc", + "once_cell", +] [[package]] name = "cexpr" @@ -527,16 +567,16 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" [[package]] name = "chrono" -version = "0.4.35" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -552,9 +592,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.2" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", "clap_derive", @@ -574,14 +614,14 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.0" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] @@ -601,15 +641,15 @@ dependencies = [ [[package]] name = "colorchoice" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" [[package]] name = "concurrent-queue" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -686,20 +726,20 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] name = "data-encoding" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "der" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" dependencies = [ "const-oid", "zeroize", @@ -767,7 +807,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] @@ -809,9 +849,9 @@ dependencies = [ [[package]] name = "either" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" [[package]] name = "env_filter" @@ -838,9 +878,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", "windows-sys 0.52.0", @@ -865,9 +905,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24" dependencies = [ "concurrent-queue", "parking", @@ -886,11 +926,11 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.5.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ - "event-listener 5.2.0", + "event-listener 5.3.0", "pin-project-lite", ] @@ -905,15 +945,15 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "fiat-crypto" -version = "0.2.6" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "fnv" @@ -937,28 +977,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] name = "futures-channel" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -968,17 +992,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1001,11 +1014,11 @@ dependencies = [ [[package]] name = "futures-lite" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-core", "futures-io", "parking", @@ -1020,7 +1033,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] @@ -1030,7 +1043,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d8a2499f0fecc0492eb3e47eab4e92da7875e1028ad2528f214ac3346ca04e" dependencies = [ "futures-io", - "rustls", + "rustls 0.22.4", "rustls-pki-types", ] @@ -1052,7 +1065,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1076,9 +1088,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.12" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", @@ -1086,6 +1098,12 @@ dependencies = [ ] [[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] name = "glob" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1105,9 +1123,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -1201,6 +1219,12 @@ dependencies = [ ] [[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + +[[package]] name = "itertools" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1211,9 +1235,18 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.10" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "jobserver" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +dependencies = [ + "libc", +] [[package]] name = "js-sys" @@ -1228,56 +1261,94 @@ dependencies = [ name = "karyon_core" version = "0.1.0" dependencies = [ - "async-lock 3.3.0", + "async-channel 2.3.0", "async-process", - "async-task", "bincode", "chrono", "dirs", "ed25519-dalek", "log", + "once_cell", "pin-project-lite", "rand", "smol", "thiserror", + "tokio", ] [[package]] name = "karyon_jsonrpc" version = "0.1.0" dependencies = [ + "async-trait", + "async-tungstenite", "env_logger", + "futures-rustls", "karyon_core", + "karyon_jsonrpc_internal", + "karyon_jsonrpc_macro", "karyon_net", "log", "memchr", "rand", + "rcgen 0.13.1", + "rustls-pemfile", "serde", "serde_json", "smol", "thiserror", + "tokio-rustls", + "webpki-roots", +] + +[[package]] +name = "karyon_jsonrpc_internal" +version = "0.1.0" +dependencies = [ + "karyon_core", + "karyon_net", + "serde_json", + "thiserror", +] + +[[package]] +name = "karyon_jsonrpc_macro" +version = "0.1.0" +dependencies = [ + "karyon_jsonrpc_internal", + "proc-macro2", + "quote", + "serde_json", + "syn 1.0.109", ] [[package]] name = "karyon_net" version = "0.1.0" dependencies = [ + "async-channel 2.3.0", "async-trait", "async-tungstenite", + "asynchronous-codec", "bincode", "futures-rustls", + "futures-util", "karyon_core", "log", + "pin-project-lite", + "rustls-pki-types", "smol", "thiserror", + "tokio", + "tokio-rustls", "url", - "ws_stream_tungstenite", ] [[package]] name = "karyon_p2p" version = "0.1.0" dependencies = [ + "async-channel 2.3.0", "async-std", "async-trait", "bincode", @@ -1292,11 +1363,13 @@ dependencies = [ "karyon_net", "log", "rand", - "rcgen", + "rcgen 0.12.1", + "rustls-pki-types", "semver", "sha2", "smol", "thiserror", + "tokio-rustls", "x509-parser", "yasna", ] @@ -1324,9 +1397,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" [[package]] name = "libloading" @@ -1335,18 +1408,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] name = "libredox" -version = "0.0.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "libc", - "redox_syscall", ] [[package]] @@ -1362,6 +1434,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] name = "log" version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1372,9 +1454,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "minimal-lexical" @@ -1383,6 +1465,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] +name = "miniz_oxide" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] + +[[package]] name = "mirai-annotations" version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1394,7 +1496,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "cfg_aliases", "libc", @@ -1412,11 +1514,10 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" dependencies = [ - "autocfg", "num-integer", "num-traits", ] @@ -1438,14 +1539,33 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", ] [[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] name = "oid-registry" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1473,16 +1593,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] +name = "parking_lot" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.5", +] + +[[package]] name = "paste" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "pem" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" +checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" dependencies = [ "base64", "serde", @@ -1495,20 +1638,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] -name = "pharos" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" -dependencies = [ - "futures", - "rustc_version", -] - -[[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -1518,12 +1651,12 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "piper" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +checksum = "464db0c665917b13ebb5d453ccdec4add5658ee1adc7affc7677615356a8afaf" dependencies = [ "atomic-waker", - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-io", ] @@ -1539,9 +1672,9 @@ dependencies = [ [[package]] name = "platforms" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "626dec3cac7cc0e1577a2ec3fc496277ec2baa084bebad95bb6fdbfae235f84c" +checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" [[package]] name = "polling" @@ -1561,14 +1694,15 @@ dependencies = [ [[package]] name = "polling" -version = "3.5.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24f040dee2588b4963afb4e420540439d126f73fdacf4a9c486a96d840bac3c9" +checksum = "645493cf344456ef24219d02a768cf1fb92ddf8c92161679ae3d91b91a637be3" dependencies = [ "cfg-if", "concurrent-queue", + "hermit-abi", "pin-project-lite", - "rustix 0.38.31", + "rustix 0.38.34", "tracing", "windows-sys 0.52.0", ] @@ -1587,28 +1721,28 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.2.16" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.63", ] [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -1656,19 +1790,32 @@ dependencies = [ ] [[package]] +name = "rcgen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779" +dependencies = [ + "pem", + "ring", + "rustls-pki-types", + "time", + "yasna", +] + +[[package]] name = "redox_syscall" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.5.0", ] [[package]] name = "redox_users" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" +checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" dependencies = [ "getrandom", "libredox", @@ -1677,9 +1824,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -1700,9 +1847,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "ring" @@ -1720,6 +1867,12 @@ dependencies = [ ] [[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] name = "rustc-hash" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1759,11 +1912,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys 0.4.13", @@ -1772,9 +1925,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "aws-lc-rs", "ring", @@ -1785,16 +1938,41 @@ dependencies = [ ] [[package]] +name = "rustls" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afabcee0551bd1aa3e18e5adbf2c0544722014b899adb31bd186ec638d3da97e" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] name = "rustls-pki-types" -version = "1.3.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.102.2" +version = "0.102.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" dependencies = [ "aws-lc-rs", "ring", @@ -1804,41 +1982,47 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "semver" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.197" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.197" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] name = "serde_json" -version = "1.0.114" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", @@ -1875,9 +2059,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" dependencies = [ "libc", ] @@ -1901,12 +2085,18 @@ dependencies = [ ] [[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] name = "smol" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e635339259e51ef85ac7aa29a1cd991b957047507288697a690e80ab97d07cad" dependencies = [ - "async-channel 2.2.0", + "async-channel 2.3.0", "async-executor", "async-fs", "async-io 2.3.2", @@ -1914,7 +2104,7 @@ dependencies = [ "async-net", "async-process", "blocking", - "futures-lite 2.2.0", + "futures-lite 2.3.0", ] [[package]] @@ -1928,6 +2118,16 @@ dependencies = [ ] [[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1945,9 +2145,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "subtle" @@ -1957,9 +2157,20 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" -version = "2.0.52" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf5be731623ca1a1fb7d8be6f261a3be6d3e2337b8a1f97be944d020c8fcb704" dependencies = [ "proc-macro2", "quote", @@ -1974,34 +2185,34 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] name = "thiserror" -version = "1.0.58" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +checksum = "579e9083ca58dd9dcf91a9923bb9054071b9ebbd800b342194c9feb0ee89fc18" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.58" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", ] [[package]] name = "time" -version = "0.3.34" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", @@ -2020,9 +2231,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ "num-conv", "time-core", @@ -2044,25 +2255,54 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] -name = "tracing" -version = "0.1.40" +name = "tokio" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", "pin-project-lite", - "tracing-attributes", - "tracing-core", + "signal-hook-registry", + "socket2 0.5.7", + "tokio-macros", + "windows-sys 0.48.0", ] [[package]] -name = "tracing-attributes" -version = "0.1.27" +name = "tokio-macros" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.5", + "rustls-pki-types", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", ] [[package]] @@ -2070,9 +2310,6 @@ name = "tracing-core" version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" -dependencies = [ - "once_cell", -] [[package]] name = "tungstenite" @@ -2157,9 +2394,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "value-bag" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec26a25bd6fca441cdd0f769fd7f891bae119f996de31f86a5eddccef54c1d" +checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" [[package]] name = "version_check" @@ -2206,7 +2443,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.63", "wasm-bindgen-shared", ] @@ -2240,7 +2477,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.63", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2262,6 +2499,15 @@ dependencies = [ ] [[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + +[[package]] name = "which" version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2270,7 +2516,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.31", + "rustix 0.38.34", ] [[package]] @@ -2301,7 +2547,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -2319,7 +2565,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -2339,17 +2585,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -2360,9 +2607,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -2372,9 +2619,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -2384,9 +2631,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -2396,9 +2649,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -2408,9 +2661,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -2420,9 +2673,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -2432,28 +2685,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" - -[[package]] -name = "ws_stream_tungstenite" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a198f414f083fb19fcc1bffcb0fa0cf46d33ccfa229adf248cac12c180e91609" -dependencies = [ - "async-tungstenite", - "async_io_stream", - "bitflags 2.4.2", - "futures-core", - "futures-io", - "futures-sink", - "futures-util", - "pharos", - "rustc_version", - "tracing", - "tungstenite", -] +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "x509-parser" @@ -2,20 +2,14 @@ resolver = "2" # Please ensure that each crate comes before any other crate that depends on it -members = [ - "core", - "net", - "p2p", - "jsonrpc", -] +members = ["core", "net", "p2p", "jsonrpc"] [workspace.package] version = "0.1.0" edition = "2021" [workspace.dependencies] -karyon_core = { path = "core" } -karyon_net = { path = "net" } -karyon_p2p = { path = "p2p" } -karyon_jsonrpc = { path = "jsonrpc" } - +karyon_core = { path = "core", default-features = false } +karyon_net = { path = "net", default-features = false } +karyon_jsonrpc = { path = "jsonrpc", default-features = false } +karyon_p2p = { path = "p2p", default-features = false } @@ -1,18 +1,34 @@ -# karyon +# Karyon -An infrastructure for peer-to-peer, decentralized, and collaborative software. +## Overview + +Many developers around the world aspire to build peer-to-peer, decentralized +apps that are resilient, secure, and free from central control. +However, there are still not many libraries and tools available to build these +kinds of apps. This forces many developers to either abandon their ideas or +develop a new p2p network stack and tools from scratch. Such efforts are not +only time-consuming but also prone to errors and security vulnerabilities, as +each new implementation reintroduces potential weaknesses. + +Karyon provides developers with the components and tools needed to create +decentralized apps. By offering a robust infrastructure, Karyon simplifies the +complexities associated with building p2p apps. Karyon's primary goal +is to make the process of decentralization more achievable and efficient for +developers everywhere, pushing for a future where software is more open, +secure, and free from central control. > In molecular biology, a Karyon is essentially "a part of the cell > containing DNA and RNA and responsible for growth and reproduction" -Join us on: +## Join us +- [irc](irc://irc.libera.chat/#karyon) #karyon on Libera Chat - [Discord](https://discord.gg/xuXRcrkz3p) ## Crates - [karyon core](./core): Essential utilities and core functionality. -- [karyon net](./net): Provides a network interface for TCP, UDP, and Unix, +- [karyon net](./net): Provides a network interface for TCP, UDP, WebSocket, and Unix, along with common network functionality. - [karyon p2p](./p2p): A lightweight, extensible, and customizable peer-to-peer (p2p) network stack. @@ -20,14 +36,13 @@ Join us on: [JSONRPC2.0](https://www.jsonrpc.org/specification) implementation. - karyon crdt: A [CRDT](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) implementation for building collaborative software. -- karyon base: A lightweight, extensible database that operates with karyon crdt. +- karyon base: A lightweight, extensible database that operates with `karyon crdt`. -## Status +## Choosing the async runtime -This project is a work in progress. The current focus is on shipping `karyon -crdt` and `karyon base`, along with major changes to the network stack. You can -check the [issues](https://github.com/karyontech/karyon/issues) for updates on -ongoing tasks. +All the crates support both smol(async-std) and tokio. The default is smol, but +if you want to use tokio, you need to disable the default features and then +select the `tokio` feature. ## Docs @@ -39,11 +54,12 @@ For the internal crates: [karyon_core](https://karyontech.github.io/karyon/karyon_core), [karyon_net](https://karyontech.github.io/karyon/karyon_net) -## Thanks +## Status -Big thanks to [Ink & Switch](https://www.inkandswitch.com/) team, -[smol](https://github.com/smol-rs/smol) async runtime, and -[zmq.rs](https://github.com/zeromq/zmq.rs) for the inspiration!. +This project is a work in progress. The current focus is on shipping `karyon +crdt` and `karyon base`, along with major changes to the network stack. You can +check the [issues](https://github.com/karyontech/karyon/issues) for updates on +ongoing tasks. ## Contribution diff --git a/core/Cargo.toml b/core/Cargo.toml index c8e2b8d..4bb7f4f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,13 +1,15 @@ [package] name = "karyon_core" -version.workspace = true +version.workspace = true edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["smol"] +crypto = ["dep:ed25519-dalek"] +tokio = ["dep:tokio"] +smol = ["dep:smol", "dep:async-process"] [dependencies] -smol = "2.0.0" pin-project-lite = "0.2.13" log = "0.4.21" bincode = "2.0.0-rc.3" @@ -15,15 +17,15 @@ chrono = "0.4.35" rand = "0.8.5" thiserror = "1.0.58" dirs = "5.0.1" -async-task = "4.7.0" -async-lock = "3.3.0" -async-process = "2.1.0" - -ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true} +async-channel = "2.2.0" +# crypto feature deps +ed25519-dalek = { version = "2.1.1", features = ["rand_core"], optional = true } -[features] -default = [] -crypto = ["dep:ed25519-dalek"] - +# smol feature deps +async-process = { version = "2.1.0", optional = true } +smol = { version = "2.0.0", optional = true } +# tokio feature deps +tokio = { version = "1.37.0", features = ["full"], optional = true } +once_cell = "1.19.0" diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs new file mode 100644 index 0000000..9335f12 --- /dev/null +++ b/core/src/async_runtime/executor.rs @@ -0,0 +1,100 @@ +use std::{future::Future, panic::catch_unwind, sync::Arc, thread}; + +use once_cell::sync::OnceCell; + +#[cfg(feature = "smol")] +pub use smol::Executor as SmolEx; + +#[cfg(feature = "tokio")] +pub use tokio::runtime::Runtime; + +use super::Task; + +#[derive(Clone)] +pub struct Executor { + #[cfg(feature = "smol")] + inner: Arc<SmolEx<'static>>, + #[cfg(feature = "tokio")] + inner: Arc<Runtime>, +} + +impl Executor { + pub fn spawn<T: Send + 'static>( + &self, + future: impl Future<Output = T> + Send + 'static, + ) -> Task<T> { + self.inner.spawn(future).into() + } +} + +static GLOBAL_EXECUTOR: OnceCell<Executor> = OnceCell::new(); + +/// Returns a single-threaded global executor +pub fn global_executor() -> Executor { + #[cfg(feature = "smol")] + fn init_executor() -> Executor { + let ex = smol::Executor::new(); + thread::Builder::new() + .name("smol-executor".to_string()) + .spawn(|| loop { + catch_unwind(|| { + smol::block_on(global_executor().inner.run(std::future::pending::<()>())) + }) + .ok(); + }) + .expect("cannot spawn executor thread"); + // Prevent spawning another thread by running the process driver on this + // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs + ex.spawn(async_process::driver()).detach(); + Executor { + inner: Arc::new(ex), + } + } + + #[cfg(feature = "tokio")] + fn init_executor() -> Executor { + let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime")); + let ex_cloned = ex.clone(); + thread::Builder::new() + .name("tokio-executor".to_string()) + .spawn(move || { + catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok(); + }) + .expect("cannot spawn tokio runtime thread"); + Executor { inner: ex } + } + + GLOBAL_EXECUTOR.get_or_init(init_executor).clone() +} + +#[cfg(feature = "smol")] +impl From<Arc<smol::Executor<'static>>> for Executor { + fn from(ex: Arc<smol::Executor<'static>>) -> Executor { + Executor { inner: ex } + } +} + +#[cfg(feature = "tokio")] +impl From<Arc<tokio::runtime::Runtime>> for Executor { + fn from(rt: Arc<tokio::runtime::Runtime>) -> Executor { + Executor { inner: rt } + } +} + +#[cfg(feature = "smol")] +impl From<smol::Executor<'static>> for Executor { + fn from(ex: smol::Executor<'static>) -> Executor { + Executor { + inner: Arc::new(ex), + } + } +} + +#[cfg(feature = "tokio")] +impl From<tokio::runtime::Runtime> for Executor { + fn from(rt: tokio::runtime::Runtime) -> Executor { + Executor { + inner: Arc::new(rt), + } + } +} diff --git a/core/src/async_runtime/io.rs b/core/src/async_runtime/io.rs new file mode 100644 index 0000000..161c258 --- /dev/null +++ b/core/src/async_runtime/io.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "smol")] +pub use smol::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; + +#[cfg(feature = "tokio")] +pub use tokio::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; diff --git a/core/src/async_runtime/lock.rs b/core/src/async_runtime/lock.rs new file mode 100644 index 0000000..fc84d1d --- /dev/null +++ b/core/src/async_runtime/lock.rs @@ -0,0 +1,5 @@ +#[cfg(feature = "smol")] +pub use smol::lock::{Mutex, MutexGuard, OnceCell, RwLock}; + +#[cfg(feature = "tokio")] +pub use tokio::sync::{Mutex, MutexGuard, OnceCell, RwLock}; diff --git a/core/src/async_runtime/mod.rs b/core/src/async_runtime/mod.rs new file mode 100644 index 0000000..d91d01b --- /dev/null +++ b/core/src/async_runtime/mod.rs @@ -0,0 +1,25 @@ +mod executor; +pub mod io; +pub mod lock; +pub mod net; +mod spawn; +mod task; +mod timer; + +pub use executor::{global_executor, Executor}; +pub use spawn::spawn; +pub use task::Task; + +#[cfg(test)] +pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T { + #[cfg(feature = "smol")] + let result = smol::block_on(future); + #[cfg(feature = "tokio")] + let result = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future); + + result +} diff --git a/core/src/async_runtime/net.rs b/core/src/async_runtime/net.rs new file mode 100644 index 0000000..5c004ce --- /dev/null +++ b/core/src/async_runtime/net.rs @@ -0,0 +1,12 @@ +pub use std::os::unix::net::SocketAddr; + +#[cfg(feature = "smol")] +pub use smol::net::{ + unix::{SocketAddr as UnixSocketAddr, UnixListener, UnixStream}, + TcpListener, TcpStream, UdpSocket, +}; + +#[cfg(feature = "tokio")] +pub use tokio::net::{ + unix::SocketAddr as UnixSocketAddr, TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream, +}; diff --git a/core/src/async_runtime/spawn.rs b/core/src/async_runtime/spawn.rs new file mode 100644 index 0000000..2760982 --- /dev/null +++ b/core/src/async_runtime/spawn.rs @@ -0,0 +1,12 @@ +use std::future::Future; + +use super::Task; + +pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { + #[cfg(feature = "smol")] + let result: Task<T> = smol::spawn(future).into(); + #[cfg(feature = "tokio")] + let result: Task<T> = tokio::spawn(future).into(); + + result +} diff --git a/core/src/async_runtime/task.rs b/core/src/async_runtime/task.rs new file mode 100644 index 0000000..a681b0f --- /dev/null +++ b/core/src/async_runtime/task.rs @@ -0,0 +1,52 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::error::Error; + +pub struct Task<T> { + #[cfg(feature = "smol")] + inner_task: smol::Task<T>, + #[cfg(feature = "tokio")] + inner_task: tokio::task::JoinHandle<T>, +} + +impl<T> Task<T> { + pub async fn cancel(self) { + #[cfg(feature = "smol")] + self.inner_task.cancel().await; + #[cfg(feature = "tokio")] + self.inner_task.abort(); + } +} + +impl<T> Future for Task<T> { + type Output = Result<T, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[cfg(feature = "smol")] + let result = smol::Task::poll(Pin::new(&mut self.inner_task), cx); + #[cfg(feature = "tokio")] + let result = tokio::task::JoinHandle::poll(Pin::new(&mut self.inner_task), cx); + + #[cfg(feature = "smol")] + return result.map(Ok); + + #[cfg(feature = "tokio")] + return result.map_err(|e| e.into()); + } +} + +#[cfg(feature = "smol")] +impl<T> From<smol::Task<T>> for Task<T> { + fn from(t: smol::Task<T>) -> Task<T> { + Task { inner_task: t } + } +} + +#[cfg(feature = "tokio")] +impl<T> From<tokio::task::JoinHandle<T>> for Task<T> { + fn from(t: tokio::task::JoinHandle<T>) -> Task<T> { + Task { inner_task: t } + } +} diff --git a/core/src/async_runtime/timer.rs b/core/src/async_runtime/timer.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/core/src/async_runtime/timer.rs @@ -0,0 +1 @@ + diff --git a/core/src/async_util/backoff.rs b/core/src/async_util/backoff.rs index 4a0ab35..70e63b3 100644 --- a/core/src/async_util/backoff.rs +++ b/core/src/async_util/backoff.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use smol::Timer; +use super::sleep; /// Exponential backoff /// <https://en.wikipedia.org/wiki/Exponential_backoff> @@ -57,7 +57,7 @@ impl Backoff { /// Retruns the delay value. pub async fn sleep(&self) -> u64 { if self.stop.load(Ordering::SeqCst) { - Timer::after(Duration::from_millis(self.max_delay)).await; + sleep(Duration::from_millis(self.max_delay)).await; return self.max_delay; } @@ -71,7 +71,7 @@ impl Backoff { self.retries.store(retries + 1, Ordering::SeqCst); - Timer::after(Duration::from_millis(delay)).await; + sleep(Duration::from_millis(delay)).await; delay } @@ -84,15 +84,18 @@ impl Backoff { #[cfg(test)] mod tests { - use super::*; use std::sync::Arc; + use crate::async_runtime::{block_on, spawn}; + + use super::*; + #[test] fn test_backoff() { - smol::block_on(async move { + block_on(async move { let backoff = Arc::new(Backoff::new(5, 15)); let backoff_c = backoff.clone(); - smol::spawn(async move { + spawn(async move { let delay = backoff_c.sleep().await; assert_eq!(delay, 5); @@ -102,14 +105,16 @@ mod tests { let delay = backoff_c.sleep().await; assert_eq!(delay, 15); }) - .await; + .await + .unwrap(); - smol::spawn(async move { + spawn(async move { backoff.reset(); let delay = backoff.sleep().await; assert_eq!(delay, 5); }) - .await; + .await + .unwrap(); }); } } diff --git a/core/src/async_util/condvar.rs b/core/src/async_util/condvar.rs index d3bc15b..c3f373d 100644 --- a/core/src/async_util/condvar.rs +++ b/core/src/async_util/condvar.rs @@ -6,9 +6,7 @@ use std::{ task::{Context, Poll, Waker}, }; -use smol::lock::MutexGuard; - -use crate::util::random_16; +use crate::{async_runtime::lock::MutexGuard, util::random_16}; /// CondVar is an async version of <https://doc.rust-lang.org/std/sync/struct.Condvar.html> /// @@ -17,9 +15,8 @@ use crate::util::random_16; ///``` /// use std::sync::Arc; /// -/// use smol::lock::Mutex; -/// /// use karyon_core::async_util::CondVar; +/// use karyon_core::async_runtime::{spawn, lock::Mutex}; /// /// async { /// @@ -28,7 +25,7 @@ use crate::util::random_16; /// /// let val_cloned = val.clone(); /// let condvar_cloned = condvar.clone(); -/// smol::spawn(async move { +/// spawn(async move { /// let mut val = val_cloned.lock().await; /// /// // While the boolean flag is false, wait for a signal. @@ -40,7 +37,7 @@ use crate::util::random_16; /// }); /// /// let condvar_cloned = condvar.clone(); -/// smol::spawn(async move { +/// spawn(async move { /// let mut val = val.lock().await; /// /// // While the boolean flag is false, wait for a signal. @@ -71,7 +68,10 @@ impl CondVar { /// Blocks the current task until this condition variable receives a notification. pub async fn wait<'a, T>(&self, g: MutexGuard<'a, T>) -> MutexGuard<'a, T> { + #[cfg(feature = "smol")] let m = MutexGuard::source(&g); + #[cfg(feature = "tokio")] + let m = MutexGuard::mutex(&g); CondVarAwait::new(self, g).await; @@ -206,8 +206,6 @@ impl Wakers { #[cfg(test)] mod tests { - use super::*; - use smol::lock::Mutex; use std::{ collections::VecDeque, sync::{ @@ -216,6 +214,10 @@ mod tests { }, }; + use crate::async_runtime::{block_on, lock::Mutex, spawn}; + + use super::*; + // The tests below demonstrate a solution to a problem in the Wikipedia // explanation of condition variables: // https://en.wikipedia.org/wiki/Monitor_(synchronization)#Solving_the_bounded_producer/consumer_problem. @@ -243,7 +245,7 @@ mod tests { #[test] fn test_condvar_signal() { - smol::block_on(async { + block_on(async { let number_of_tasks = 30; let queue = Arc::new(Mutex::new(Queue::new(5))); @@ -254,7 +256,7 @@ mod tests { let condvar_full_cloned = condvar_full.clone(); let condvar_empty_cloned = condvar_empty.clone(); - let _producer1 = smol::spawn(async move { + let _producer1 = spawn(async move { for i in 1..number_of_tasks { // Lock queue mtuex let mut queue = queue_cloned.lock().await; @@ -275,7 +277,7 @@ mod tests { let queue_cloned = queue.clone(); let task_consumed = Arc::new(AtomicUsize::new(0)); let task_consumed_ = task_consumed.clone(); - let consumer = smol::spawn(async move { + let consumer = spawn(async move { for _ in 1..number_of_tasks { // Lock queue mtuex let mut queue = queue_cloned.lock().await; @@ -297,7 +299,7 @@ mod tests { } }); - consumer.await; + let _ = consumer.await; assert!(queue.lock().await.is_empty()); assert_eq!(task_consumed.load(Ordering::Relaxed), 29); }); @@ -305,7 +307,7 @@ mod tests { #[test] fn test_condvar_broadcast() { - smol::block_on(async { + block_on(async { let tasks = 30; let queue = Arc::new(Mutex::new(Queue::new(5))); @@ -313,7 +315,7 @@ mod tests { let queue_cloned = queue.clone(); let condvar_cloned = condvar.clone(); - let _producer1 = smol::spawn(async move { + let _producer1 = spawn(async move { for i in 1..tasks { // Lock queue mtuex let mut queue = queue_cloned.lock().await; @@ -333,7 +335,7 @@ mod tests { let queue_cloned = queue.clone(); let condvar_cloned = condvar.clone(); - let _producer2 = smol::spawn(async move { + let _producer2 = spawn(async move { for i in 1..tasks { // Lock queue mtuex let mut queue = queue_cloned.lock().await; @@ -355,7 +357,7 @@ mod tests { let task_consumed = Arc::new(AtomicUsize::new(0)); let task_consumed_ = task_consumed.clone(); - let consumer = smol::spawn(async move { + let consumer = spawn(async move { for _ in 1..((tasks * 2) - 1) { { // Lock queue mutex @@ -379,7 +381,7 @@ mod tests { } }); - consumer.await; + let _ = consumer.await; assert!(queue.lock().await.is_empty()); assert_eq!(task_consumed.load(Ordering::Relaxed), 58); }); diff --git a/core/src/async_util/condwait.rs b/core/src/async_util/condwait.rs index 6aa8a3c..76c6a05 100644 --- a/core/src/async_util/condwait.rs +++ b/core/src/async_util/condwait.rs @@ -1,6 +1,5 @@ -use smol::lock::Mutex; - use super::CondVar; +use crate::async_runtime::lock::Mutex; /// CondWait is a wrapper struct for CondVar with a Mutex boolean flag. /// @@ -10,11 +9,12 @@ use super::CondVar; /// use std::sync::Arc; /// /// use karyon_core::async_util::CondWait; +/// use karyon_core::async_runtime::spawn; /// /// async { /// let cond_wait = Arc::new(CondWait::new()); /// let cond_wait_cloned = cond_wait.clone(); -/// let task = smol::spawn(async move { +/// let task = spawn(async move { /// cond_wait_cloned.wait().await; /// // ... /// }); @@ -76,21 +76,24 @@ impl Default for CondWait { #[cfg(test)] mod tests { - use super::*; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; + use crate::async_runtime::{block_on, spawn}; + + use super::*; + #[test] fn test_cond_wait() { - smol::block_on(async { + block_on(async { let cond_wait = Arc::new(CondWait::new()); let count = Arc::new(AtomicUsize::new(0)); let cond_wait_cloned = cond_wait.clone(); let count_cloned = count.clone(); - let task = smol::spawn(async move { + let task = spawn(async move { cond_wait_cloned.wait().await; count_cloned.fetch_add(1, Ordering::Relaxed); // do something @@ -99,7 +102,7 @@ mod tests { // Send a signal to the waiting task cond_wait.signal().await; - task.await; + let _ = task.await; // Reset the boolean flag cond_wait.reset().await; @@ -108,7 +111,7 @@ mod tests { let cond_wait_cloned = cond_wait.clone(); let count_cloned = count.clone(); - let task1 = smol::spawn(async move { + let task1 = spawn(async move { cond_wait_cloned.wait().await; count_cloned.fetch_add(1, Ordering::Relaxed); // do something @@ -116,7 +119,7 @@ mod tests { let cond_wait_cloned = cond_wait.clone(); let count_cloned = count.clone(); - let task2 = smol::spawn(async move { + let task2 = spawn(async move { cond_wait_cloned.wait().await; count_cloned.fetch_add(1, Ordering::Relaxed); // do something @@ -125,8 +128,8 @@ mod tests { // Broadcast a signal to all waiting tasks cond_wait.broadcast().await; - task1.await; - task2.await; + let _ = task1.await; + let _ = task2.await; assert_eq!(count.load(Ordering::Relaxed), 3); }); } diff --git a/core/src/async_util/executor.rs b/core/src/async_util/executor.rs deleted file mode 100644 index 3e7aa06..0000000 --- a/core/src/async_util/executor.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::{panic::catch_unwind, sync::Arc, thread}; - -use async_lock::OnceCell; -use smol::Executor as SmolEx; - -static GLOBAL_EXECUTOR: OnceCell<Arc<smol::Executor<'_>>> = OnceCell::new(); - -/// A pointer to an Executor -pub type Executor<'a> = Arc<SmolEx<'a>>; - -/// Returns a single-threaded global executor -pub(crate) fn global_executor() -> Executor<'static> { - fn init_executor() -> Executor<'static> { - let ex = smol::Executor::new(); - thread::Builder::new() - .spawn(|| loop { - catch_unwind(|| { - smol::block_on(global_executor().run(smol::future::pending::<()>())) - }) - .ok(); - }) - .expect("cannot spawn executor thread"); - // Prevent spawning another thread by running the process driver on this - // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs - ex.spawn(async_process::driver()).detach(); - Arc::new(ex) - } - - GLOBAL_EXECUTOR.get_or_init_blocking(init_executor).clone() -} diff --git a/core/src/async_util/mod.rs b/core/src/async_util/mod.rs index 2916118..54b9607 100644 --- a/core/src/async_util/mod.rs +++ b/core/src/async_util/mod.rs @@ -1,15 +1,15 @@ mod backoff; mod condvar; mod condwait; -mod executor; mod select; +mod sleep; mod task_group; mod timeout; pub use backoff::Backoff; pub use condvar::CondVar; pub use condwait::CondWait; -pub use executor::Executor; pub use select::{select, Either}; +pub use sleep::sleep; pub use task_group::{TaskGroup, TaskResult}; pub use timeout::timeout; diff --git a/core/src/async_util/select.rs b/core/src/async_util/select.rs index 0977fa9..2008cb5 100644 --- a/core/src/async_util/select.rs +++ b/core/src/async_util/select.rs @@ -1,8 +1,8 @@ +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use pin_project_lite::pin_project; -use smol::future::Future; /// Returns the result of the future that completes first, preferring future1 /// if both are ready. @@ -75,14 +75,16 @@ where #[cfg(test)] mod tests { - use super::{select, Either}; - use smol::Timer; use std::future; + use crate::{async_runtime::block_on, async_util::sleep}; + + use super::{select, Either}; + #[test] fn test_async_select() { - smol::block_on(async move { - let fut = select(Timer::never(), future::ready(0 as u32)).await; + block_on(async move { + let fut = select(sleep(std::time::Duration::MAX), future::ready(0 as u32)).await; assert!(matches!(fut, Either::Right(0))); let fut1 = future::pending::<String>(); diff --git a/core/src/async_util/sleep.rs b/core/src/async_util/sleep.rs new file mode 100644 index 0000000..f72b825 --- /dev/null +++ b/core/src/async_util/sleep.rs @@ -0,0 +1,6 @@ +pub async fn sleep(duration: std::time::Duration) { + #[cfg(feature = "smol")] + smol::Timer::after(duration).await; + #[cfg(feature = "tokio")] + tokio::time::sleep(duration).await; +} diff --git a/core/src/async_util/task_group.rs b/core/src/async_util/task_group.rs index 7f05696..5af75ed 100644 --- a/core/src/async_util/task_group.rs +++ b/core/src/async_util/task_group.rs @@ -1,8 +1,8 @@ use std::{future::Future, sync::Arc, sync::Mutex}; -use async_task::FallibleTask; +use crate::async_runtime::{global_executor, Executor, Task}; -use super::{executor::global_executor, select, CondWait, Either, Executor}; +use super::{select, CondWait, Either}; /// TaskGroup A group that contains spawned tasks. /// @@ -12,28 +12,25 @@ use super::{executor::global_executor, select, CondWait, Either, Executor}; /// /// use std::sync::Arc; /// -/// use karyon_core::async_util::TaskGroup; +/// use karyon_core::async_util::{TaskGroup, sleep}; /// /// async { +/// let group = TaskGroup::new(); /// -/// let ex = Arc::new(smol::Executor::new()); -/// let group = TaskGroup::with_executor(ex); -/// -/// group.spawn(smol::Timer::never(), |_| async {}); +/// group.spawn(sleep(std::time::Duration::MAX), |_| async {}); /// /// group.cancel().await; /// /// }; /// /// ``` -/// -pub struct TaskGroup<'a> { +pub struct TaskGroup { tasks: Mutex<Vec<TaskHandler>>, stop_signal: Arc<CondWait>, - executor: Executor<'a>, + executor: Executor, } -impl TaskGroup<'static> { +impl TaskGroup { /// Creates a new TaskGroup without providing an executor /// /// This will spawn a task onto a global executor (single-threaded by default). @@ -44,11 +41,9 @@ impl TaskGroup<'static> { executor: global_executor(), } } -} -impl<'a> TaskGroup<'a> { /// Creates a new TaskGroup by providing an executor - pub fn with_executor(executor: Executor<'a>) -> Self { + pub fn with_executor(executor: Executor) -> Self { Self { tasks: Mutex::new(Vec::new()), stop_signal: Arc::new(CondWait::new()), @@ -61,10 +56,10 @@ impl<'a> TaskGroup<'a> { /// parameter, indicating whether the task completed or was canceled. pub fn spawn<T, Fut, CallbackF, CallbackFut>(&self, fut: Fut, callback: CallbackF) where - T: Send + Sync + 'a, - Fut: Future<Output = T> + Send + 'a, - CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, - CallbackFut: Future<Output = ()> + Send + 'a, + T: Send + Sync + 'static, + Fut: Future<Output = T> + Send + 'static, + CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'static, + CallbackFut: Future<Output = ()> + Send + 'static, { let task = TaskHandler::new( self.executor.clone(), @@ -100,7 +95,7 @@ impl<'a> TaskGroup<'a> { } } -impl Default for TaskGroup<'static> { +impl Default for TaskGroup { fn default() -> Self { Self::new() } @@ -124,42 +119,40 @@ impl<T: std::fmt::Debug> std::fmt::Display for TaskResult<T> { /// TaskHandler pub struct TaskHandler { - task: FallibleTask<()>, + task: Task<()>, cancel_flag: Arc<CondWait>, } impl<'a> TaskHandler { /// Creates a new task handler fn new<T, Fut, CallbackF, CallbackFut>( - ex: Executor<'a>, + ex: Executor, fut: Fut, callback: CallbackF, stop_signal: Arc<CondWait>, ) -> TaskHandler where - T: Send + Sync + 'a, - Fut: Future<Output = T> + Send + 'a, - CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'a, - CallbackFut: Future<Output = ()> + Send + 'a, + T: Send + Sync + 'static, + Fut: Future<Output = T> + Send + 'static, + CallbackF: FnOnce(TaskResult<T>) -> CallbackFut + Send + 'static, + CallbackFut: Future<Output = ()> + Send + 'static, { let cancel_flag = Arc::new(CondWait::new()); let cancel_flag_c = cancel_flag.clone(); - let task = ex - .spawn(async move { - // Waits for either the stop signal or the task to complete. - let result = select(stop_signal.wait(), fut).await; + let task = ex.spawn(async move { + // Waits for either the stop signal or the task to complete. + let result = select(stop_signal.wait(), fut).await; - let result = match result { - Either::Left(_) => TaskResult::Cancelled, - Either::Right(res) => TaskResult::Completed(res), - }; + let result = match result { + Either::Left(_) => TaskResult::Cancelled, + Either::Right(res) => TaskResult::Completed(res), + }; - // Call the callback - callback(result).await; + // Call the callback + callback(result).await; - cancel_flag_c.signal().await; - }) - .fallible(); + cancel_flag_c.signal().await; + }); TaskHandler { task, cancel_flag } } @@ -173,14 +166,52 @@ impl<'a> TaskHandler { #[cfg(test)] mod tests { - use super::*; use std::{future, sync::Arc}; + use crate::async_runtime::block_on; + use crate::async_util::sleep; + + use super::*; + + #[cfg(feature = "tokio")] + #[test] + fn test_task_group_with_tokio_executor() { + let ex = Arc::new(tokio::runtime::Runtime::new().unwrap()); + ex.clone().block_on(async move { + let group = Arc::new(TaskGroup::with_executor(ex.into())); + + group.spawn(future::ready(0), |res| async move { + assert!(matches!(res, TaskResult::Completed(0))); + }); + + group.spawn(future::pending::<()>(), |res| async move { + assert!(matches!(res, TaskResult::Cancelled)); + }); + + let groupc = group.clone(); + group.spawn( + async move { + groupc.spawn(future::pending::<()>(), |res| async move { + assert!(matches!(res, TaskResult::Cancelled)); + }); + }, + |res| async move { + assert!(matches!(res, TaskResult::Completed(_))); + }, + ); + + // Do something + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + group.cancel().await; + }); + } + + #[cfg(feature = "smol")] #[test] - fn test_task_group_with_executor() { + fn test_task_group_with_smol_executor() { let ex = Arc::new(smol::Executor::new()); smol::block_on(ex.clone().run(async move { - let group = Arc::new(TaskGroup::with_executor(ex)); + let group = Arc::new(TaskGroup::with_executor(ex.into())); group.spawn(future::ready(0), |res| async move { assert!(matches!(res, TaskResult::Completed(0))); @@ -210,7 +241,7 @@ mod tests { #[test] fn test_task_group() { - smol::block_on(async { + block_on(async { let group = Arc::new(TaskGroup::new()); group.spawn(future::ready(0), |res| async move { @@ -234,7 +265,7 @@ mod tests { ); // Do something - smol::Timer::after(std::time::Duration::from_millis(50)).await; + sleep(std::time::Duration::from_millis(50)).await; group.cancel().await; }); } diff --git a/core/src/async_util/timeout.rs b/core/src/async_util/timeout.rs index cf3c490..9ac64c8 100644 --- a/core/src/async_util/timeout.rs +++ b/core/src/async_util/timeout.rs @@ -1,10 +1,9 @@ use std::{future::Future, time::Duration}; -use smol::Timer; - -use super::{select, Either}; use crate::{error::Error, Result}; +use super::{select, sleep, Either}; + /// Waits for a future to complete or times out if it exceeds a specified /// duration. /// @@ -26,7 +25,7 @@ pub async fn timeout<T, F>(delay: Duration, future1: F) -> Result<T> where F: Future<Output = T>, { - let result = select(Timer::after(delay), future1).await; + let result = select(sleep(delay), future1).await; match result { Either::Left(_) => Err(Error::Timeout), @@ -41,11 +40,11 @@ mod tests { #[test] fn test_timeout() { - smol::block_on(async move { + crate::async_runtime::block_on(async move { let fut = future::pending::<()>(); assert!(timeout(Duration::from_millis(10), fut).await.is_err()); - let fut = smol::Timer::after(Duration::from_millis(10)); + let fut = sleep(Duration::from_millis(10)); assert!(timeout(Duration::from_millis(50), fut).await.is_ok()) }); } diff --git a/core/src/error.rs b/core/src/error.rs index cc60696..2b8f641 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -20,11 +20,15 @@ pub enum Error { #[error(transparent)] Ed25519(#[from] ed25519_dalek::ed25519::Error), + #[cfg(feature = "tokio")] + #[error(transparent)] + TokioJoinError(#[from] tokio::task::JoinError), + #[error("Channel Send Error: {0}")] ChannelSend(String), #[error(transparent)] - ChannelRecv(#[from] smol::channel::RecvError), + ChannelRecv(#[from] async_channel::RecvError), #[error(transparent)] BincodeDecode(#[from] bincode::error::DecodeError), @@ -33,8 +37,8 @@ pub enum Error { BincodeEncode(#[from] bincode::error::EncodeError), } -impl<T> From<smol::channel::SendError<T>> for Error { - fn from(error: smol::channel::SendError<T>) -> Self { +impl<T> From<async_channel::SendError<T>> for Error { + fn from(error: async_channel::SendError<T>) -> Self { Error::ChannelSend(error.to_string()) } } diff --git a/core/src/event.rs b/core/src/event.rs index ef40205..e8692ef 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -5,14 +5,11 @@ use std::{ sync::{Arc, Weak}, }; +use async_channel::{Receiver, Sender}; use chrono::{DateTime, Utc}; use log::{error, trace}; -use smol::{ - channel::{Receiver, Sender}, - lock::Mutex, -}; -use crate::{util::random_16, Result}; +use crate::{async_runtime::lock::Mutex, util::random_16, Result}; pub type ArcEventSys<T> = Arc<EventSys<T>>; pub type WeakEventSys<T> = Weak<EventSys<T>>; @@ -139,7 +136,7 @@ where self: &Arc<Self>, topic: &T, ) -> EventListener<T, E> { - let chan = smol::channel::unbounded(); + let chan = async_channel::unbounded(); let topics = &mut self.listeners.lock().await; @@ -310,6 +307,8 @@ pub trait EventValueTopic: EventValueAny + EventValue { #[cfg(test)] mod tests { + use crate::async_runtime::block_on; + use super::*; #[derive(Hash, PartialEq, Eq, Debug, Clone)] @@ -337,11 +336,6 @@ mod tests { } #[derive(Clone, Debug, PartialEq)] - struct D { - d_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] struct E { e_value: usize, } @@ -369,12 +363,6 @@ mod tests { } } - impl EventValue for D { - fn id() -> &'static str { - "D" - } - } - impl EventValue for E { fn id() -> &'static str { "E" @@ -396,7 +384,7 @@ mod tests { #[test] fn test_event_sys() { - smol::block_on(async move { + block_on(async move { let event_sys = EventSys::<Topic>::new(); let a_listener = event_sys.register::<A>(&Topic::TopicA).await; diff --git a/core/src/lib.rs b/core/src/lib.rs index ae88188..62052a8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,8 +1,13 @@ +#[cfg(all(feature = "smol", feature = "tokio"))] +compile_error!("Only one async runtime feature should be enabled"); + +#[cfg(not(any(feature = "smol", feature = "tokio")))] +compile_error!("At least one async runtime feature must be enabled for this crate."); + /// A set of helper tools and functions. pub mod util; -/// A module containing async utilities that work with the -/// [`smol`](https://github.com/smol-rs/smol) async runtime. +/// A set of async utilities. pub mod async_util; /// Represents karyon's Core Error. @@ -14,8 +19,12 @@ pub mod event; /// A simple publish-subscribe system [`Read More`](./pubsub/struct.Publisher.html) pub mod pubsub; +/// A cross-compatible async runtime +pub mod async_runtime; + #[cfg(feature = "crypto")] + /// Collects common cryptographic tools pub mod crypto; -use error::Result; +pub use error::{Error, Result}; diff --git a/core/src/pubsub.rs b/core/src/pubsub.rs index f5cb69b..bcc24ef 100644 --- a/core/src/pubsub.rs +++ b/core/src/pubsub.rs @@ -1,9 +1,8 @@ use std::{collections::HashMap, sync::Arc}; use log::error; -use smol::lock::Mutex; -use crate::{util::random_16, Result}; +use crate::{async_runtime::lock::Mutex, util::random_16, Result}; pub type ArcPublisher<T> = Arc<Publisher<T>>; pub type SubscriptionID = u16; @@ -28,7 +27,7 @@ pub type SubscriptionID = u16; /// /// ``` pub struct Publisher<T> { - subs: Mutex<HashMap<SubscriptionID, smol::channel::Sender<T>>>, + subs: Mutex<HashMap<SubscriptionID, async_channel::Sender<T>>>, } impl<T: Clone> Publisher<T> { @@ -43,7 +42,7 @@ impl<T: Clone> Publisher<T> { pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> { let mut subs = self.subs.lock().await; - let chan = smol::channel::unbounded(); + let chan = async_channel::unbounded(); let mut sub_id = random_16(); @@ -84,7 +83,7 @@ impl<T: Clone> Publisher<T> { // Subscription pub struct Subscription<T> { id: SubscriptionID, - recv_chan: smol::channel::Receiver<T>, + recv_chan: async_channel::Receiver<T>, publisher: ArcPublisher<T>, } @@ -93,7 +92,7 @@ impl<T: Clone> Subscription<T> { pub fn new( id: SubscriptionID, publisher: ArcPublisher<T>, - recv_chan: smol::channel::Receiver<T>, + recv_chan: async_channel::Receiver<T>, ) -> Subscription<T> { Self { id, diff --git a/core/src/util/encode.rs b/core/src/util/encode.rs index 7d1061b..bf63671 100644 --- a/core/src/util/encode.rs +++ b/core/src/util/encode.rs @@ -1,15 +1,14 @@ use bincode::Encode; -use crate::Result; +use crate::{Error, Result}; /// Encode the given type `T` into a `Vec<u8>`. -pub fn encode<T: Encode>(msg: &T) -> Result<Vec<u8>> { - let vec = bincode::encode_to_vec(msg, bincode::config::standard())?; +pub fn encode<T: Encode>(src: &T) -> Result<Vec<u8>> { + let vec = bincode::encode_to_vec(src, bincode::config::standard())?; Ok(vec) } /// Encode the given type `T` into the given slice.. -pub fn encode_into_slice<T: Encode>(msg: &T, dst: &mut [u8]) -> Result<()> { - bincode::encode_into_slice(msg, dst, bincode::config::standard())?; - Ok(()) +pub fn encode_into_slice<T: Encode>(src: &T, dst: &mut [u8]) -> Result<usize> { + bincode::encode_into_slice(src, dst, bincode::config::standard()).map_err(Error::from) } diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 73ae275..e81ec10 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -1,28 +1,49 @@ [package] name = "karyon_jsonrpc" -version.workspace = true +version.workspace = true edition.workspace = true -autoexamples = false + +[features] +default = ["smol"] +smol = [ + "karyon_core/smol", + "karyon_net/smol", + "karyon_jsonrpc_internal/smol", + "karyon_jsonrpc_macro/smol", + "dep:futures-rustls", + "async-tungstenite/async-std-runtime", +] +tokio = [ + "karyon_core/tokio", + "karyon_net/tokio", + "karyon_jsonrpc_internal/tokio", + "karyon_jsonrpc_macro/tokio", + "async-tungstenite/tokio-runtime", + "dep:tokio-rustls", +] [dependencies] -karyon_core.workspace = true -karyon_net.workspace = true +karyon_core = { workspace = true, default-features = false } +karyon_net = { workspace = true, default-features = false } + +karyon_jsonrpc_macro = { path = "jsonrpc_macro", default-features = false } +karyon_jsonrpc_internal = { path = "jsonrpc_internal", default-features = false } -smol = "2.0.0" log = "0.4.21" rand = "0.8.5" +async-tungstenite = { version = "0.25.0", default-features = false } serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" thiserror = "1.0.58" memchr = "2.7.1" +async-trait = "0.1.77" -[[example]] -name = "server" -path = "examples/server.rs" - -[[example]] -name = "client" -path = "examples/client.rs" +futures-rustls = { version = "0.25.1", optional = true } +tokio-rustls = { version = "0.26.0", optional = true } [dev-dependencies] env_logger = "0.11.3" +rcgen = "0.13.1" +rustls-pemfile = "2.1.2" +smol = "2.0.0" +webpki-roots = "0.26.1" diff --git a/jsonrpc/README.md b/jsonrpc/README.md index af7dfe2..98c18e1 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -1,52 +1,73 @@ # karyon jsonrpc A fast and lightweight async implementation of [JSON-RPC -2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. +2.0](https://www.jsonrpc.org/specification). + +features: +- Supports TCP, TLS, WebSocket, and Unix protocols. +- Uses smol(async-std) as the async runtime, but also supports tokio via the + `tokio` feature. +- Allows registration of multiple services (structs) of different types on a + single server. ## Example -```rust +``` use std::sync::Arc; use serde_json::Value; use smol::net::{TcpStream, TcpListener}; -use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; +use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; struct HelloWorld {} +#[rpc_impl] impl HelloWorld { - async fn say_hello(&self, params: Value) -> Result<Value, JsonRPCError> { + async fn say_hello(&self, params: Value) -> Result<Value, Error> { let msg: String = serde_json::from_value(params)?; Ok(serde_json::json!(format!("Hello {msg}!"))) } -} -let ex = Arc::new(smol::Executor::new()); + async fn foo(&self, params: Value) -> Result<Value, Error> { + Ok(serde_json::json!("foo!")) + } -////////////////// -// Server -////////////////// -// Creates a new server -let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); -let config = ServerConfig::default(); -let server = Server::new(listener, config, ex.clone()); + async fn bar(&self, params: Value) -> Result<Value, Error> { + Ok(serde_json::json!("bar!")) + } +} -// Register the HelloWorld service -register_service!(HelloWorld, say_hello); -server.attach_service(HelloWorld{}); +// Server +async { + // Creates a new server + let server = Server::builder("tcp://127.0.0.1:60000") + .expect("create new server builder") + .service(HelloWorld{}) + .build() + .await + .expect("build the server"); -// Starts the server -ex.run(server.start()); + // Starts the server + server.start().await.expect("start the server"); +}; -////////////////// -// Client -////////////////// -// Creates a new client -let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); -let config = ClientConfig::default(); -let client = Client::new(conn, config); +// Client +async { + // Creates a new client + let client = Client::builder("tcp://127.0.0.1:60000") + .expect("create new client builder") + .build() + .await + .expect("build the client"); -let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); + let result: String = client.call("HelloWorld.say_hello", "world".to_string()) + .await + .expect("send a request"); +}; ``` + + + + diff --git a/jsonrpc/examples/client.py b/jsonrpc/examples/client.py index 2066e82..745d5db 100644 --- a/jsonrpc/examples/client.py +++ b/jsonrpc/examples/client.py @@ -3,7 +3,7 @@ import random import json HOST = "127.0.0.1" -PORT = 60000 +PORT = 6000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) @@ -15,7 +15,7 @@ req = { "params": {"x": 4, "y": 3}, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -27,7 +27,7 @@ req = { "params": {"x": 4, "y": 3}, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -39,7 +39,7 @@ req = { "params": None, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) @@ -51,7 +51,7 @@ req = { "params": None, } print("Send: ", req) -s.sendall((json.dumps(req) + '\n').encode()) +s.sendall((json.dumps(req)).encode()) res = s.recv(1024) res = json.loads(res) print("Received: ", res) diff --git a/jsonrpc/examples/client.rs b/jsonrpc/examples/client.rs index 2c8cf83..3289772 100644 --- a/jsonrpc/examples/client.rs +++ b/jsonrpc/examples/client.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; -use smol::net::TcpStream; -use karyon_jsonrpc::{Client, ClientConfig}; +use karyon_jsonrpc::Client; #[derive(Deserialize, Serialize)] struct Req { @@ -15,9 +14,11 @@ struct Pong {} fn main() { env_logger::init(); smol::future::block_on(async { - let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); - let config = ClientConfig::default(); - let client = Client::new(conn, config); + let client = Client::builder("tcp://127.0.0.1:6000") + .expect("Create client builder") + .build() + .await + .unwrap(); let params = Req { x: 10, y: 7 }; let result: u32 = client.call("Calc.add", params).await.unwrap(); diff --git a/jsonrpc/examples/server.rs b/jsonrpc/examples/server.rs index 6953433..841e276 100644 --- a/jsonrpc/examples/server.rs +++ b/jsonrpc/examples/server.rs @@ -1,10 +1,7 @@ -use std::sync::Arc; - use serde::{Deserialize, Serialize}; use serde_json::Value; -use smol::net::TcpListener; -use karyon_jsonrpc::{register_service, JsonRPCError, Server, ServerConfig}; +use karyon_jsonrpc::{rpc_impl, Error, Server}; struct Calc { version: String, @@ -19,43 +16,44 @@ struct Req { #[derive(Deserialize, Serialize)] struct Pong {} +#[rpc_impl] impl Calc { - async fn ping(&self, _params: Value) -> Result<Value, JsonRPCError> { + async fn ping(&self, _params: Value) -> Result<Value, Error> { Ok(serde_json::json!(Pong {})) } - async fn add(&self, params: Value) -> Result<Value, JsonRPCError> { + async fn add(&self, params: Value) -> Result<Value, Error> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x + params.y)) } - async fn sub(&self, params: Value) -> Result<Value, JsonRPCError> { + async fn sub(&self, params: Value) -> Result<Value, Error> { let params: Req = serde_json::from_value(params)?; Ok(serde_json::json!(params.x - params.y)) } - async fn version(&self, _params: Value) -> Result<Value, JsonRPCError> { + async fn version(&self, _params: Value) -> Result<Value, Error> { Ok(serde_json::json!(self.version)) } } fn main() { env_logger::init(); - let ex = Arc::new(smol::Executor::new()); - smol::block_on(ex.clone().run(async { - // Creates a new server - let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); - let config = ServerConfig::default(); - let server = Server::new(listener, config, ex); - + smol::block_on(async { // Register the Calc service - register_service!(Calc, ping, add, sub, version); let calc = Calc { version: String::from("0.1"), }; - server.attach_service(calc).await; + + // Creates a new server + let server = Server::builder("tcp://127.0.0.1:6000") + .expect("Create a new server builder") + .service(calc) + .build() + .await + .expect("start a new server"); // Start the server server.start().await.unwrap(); - })); + }); } diff --git a/jsonrpc/jsonrpc_internal/Cargo.toml b/jsonrpc/jsonrpc_internal/Cargo.toml new file mode 100644 index 0000000..5a3acc4 --- /dev/null +++ b/jsonrpc/jsonrpc_internal/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "karyon_jsonrpc_internal" +version.workspace = true +edition.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["smol"] +smol = ["karyon_core/smol", "karyon_net/smol"] +tokio = ["karyon_core/tokio", "karyon_net/tokio"] + +[dependencies] +karyon_core = { workspace = true, default-features = false } +karyon_net = { workspace = true, default-features = false } + +serde_json = "1.0.114" +thiserror = "1.0.58" diff --git a/jsonrpc/src/error.rs b/jsonrpc/jsonrpc_internal/src/error.rs index 8bc8c49..7f89729 100644 --- a/jsonrpc/src/error.rs +++ b/jsonrpc/jsonrpc_internal/src/error.rs @@ -26,9 +26,15 @@ pub enum Error { #[error("Invalid Message Error: {0}")] InvalidMsg(&'static str), + #[error("Unsupported protocol: {0}")] + UnsupportedProtocol(String), + + #[error("Unexpected Error: {0}")] + General(&'static str), + #[error(transparent)] KaryonCore(#[from] karyon_core::error::Error), #[error(transparent)] - KaryonNet(#[from] karyon_net::NetError), + KaryonNet(#[from] karyon_net::Error), } diff --git a/jsonrpc/src/service.rs b/jsonrpc/jsonrpc_internal/src/lib.rs index 23a50d9..95af82a 100644 --- a/jsonrpc/src/service.rs +++ b/jsonrpc/jsonrpc_internal/src/lib.rs @@ -1,6 +1,7 @@ +mod error; use std::{future::Future, pin::Pin}; -use crate::Result; +pub use error::{Error, Result}; /// Represents the RPC method pub type RPCMethod<'a> = Box<dyn Fn(serde_json::Value) -> RPCMethodOutput<'a> + Send + 'a>; @@ -20,31 +21,31 @@ pub trait RPCService: Sync + Send { /// ``` /// use serde_json::Value; /// -/// use karyon_jsonrpc::{JsonRPCError, register_service}; +/// use karyon_jsonrpc_internal::{Error, impl_rpc_service}; /// /// struct Hello {} /// /// impl Hello { -/// async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> { +/// async fn foo(&self, params: Value) -> Result<Value, Error> { /// Ok(serde_json::json!("foo!")) /// } /// -/// async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> { +/// async fn bar(&self, params: Value) -> Result<Value, Error> { /// Ok(serde_json::json!("bar!")) /// } /// } /// -/// register_service!(Hello, foo, bar); +/// impl_rpc_service!(Hello, foo, bar); /// /// ``` #[macro_export] -macro_rules! register_service { +macro_rules! impl_rpc_service { ($t:ty, $($m:ident),*) => { - impl karyon_jsonrpc::RPCService for $t { + impl karyon_jsonrpc_internal::RPCService for $t { fn get_method<'a>( &'a self, name: &'a str - ) -> Option<karyon_jsonrpc::RPCMethod> { + ) -> Option<karyon_jsonrpc_internal::RPCMethod> { match name { $( stringify!($m) => { diff --git a/jsonrpc/jsonrpc_macro/Cargo.toml b/jsonrpc/jsonrpc_macro/Cargo.toml new file mode 100644 index 0000000..17140c5 --- /dev/null +++ b/jsonrpc/jsonrpc_macro/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "karyon_jsonrpc_macro" +version.workspace = true +edition.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +proc-macro = true + +[features] +default = ["smol"] +smol = ["karyon_jsonrpc_internal/smol"] +tokio = ["karyon_jsonrpc_internal/tokio"] + +[dependencies] +karyon_jsonrpc_internal = { path = "../jsonrpc_internal", default-features = false } + +proc-macro2 = "1.0" +quote = "1.0" +syn = { version = "1.0", features = ["full"] } + +serde_json = "1.0.114" + diff --git a/jsonrpc/jsonrpc_macro/src/lib.rs b/jsonrpc/jsonrpc_macro/src/lib.rs new file mode 100644 index 0000000..f2015d4 --- /dev/null +++ b/jsonrpc/jsonrpc_macro/src/lib.rs @@ -0,0 +1,47 @@ +use proc_macro::TokenStream; +use proc_macro2::{Ident, TokenStream as TokenStream2}; +use quote::quote; +use syn::{parse_macro_input, spanned::Spanned, ImplItem, ItemImpl, Type}; + +macro_rules! err { + ($($tt:tt)*) => { + return syn::Error::new($($tt)*).to_compile_error().into() + }; +} + +#[proc_macro_attribute] +pub fn rpc_impl(_attr: TokenStream, item: TokenStream) -> TokenStream { + let mut methods: Vec<Ident> = vec![]; + + let item2 = item.clone(); + let parsed_input = parse_macro_input!(item2 as ItemImpl); + + let self_ty = match *parsed_input.self_ty { + Type::Path(p) => p, + _ => err!( + parsed_input.span(), + "implementing the trait `RPCService` on this type is unsupported" + ), + }; + + if parsed_input.items.is_empty() { + err!(self_ty.span(), "At least one method should be implemented"); + } + + for item in parsed_input.items { + match item { + ImplItem::Method(method) => { + methods.push(method.sig.ident); + } + _ => err!(item.span(), "unexpected item"), + } + } + + let item2: TokenStream2 = item.into(); + let quoted = quote! { + karyon_jsonrpc_internal::impl_rpc_service!(#self_ty, #(#methods),*); + #item2 + }; + + quoted.into() +} diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index efbaf50..50d772b 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -1,37 +1,32 @@ +use std::time::Duration; + use log::debug; use serde::{de::DeserializeOwned, Serialize}; -use karyon_core::util::random_32; -use karyon_net::ToConn; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use crate::{ - codec::{Codec, CodecConfig}, - message, Error, Result, JSONRPC_VERSION, +use karyon_core::{async_util::timeout, util::random_32}; +use karyon_net::{ + tls::ClientTlsConfig, + ws::{ClientWsConfig, ClientWssConfig}, + Conn, Endpoint, ToEndpoint, }; -/// Represents client config -#[derive(Default)] -pub struct ClientConfig { - pub timeout: Option<u64>, -} +use crate::{ + codec::{JsonCodec, WsJsonCodec}, + message, Error, Result, +}; /// Represents an RPC client pub struct Client { - codec: Codec, - config: ClientConfig, + conn: Conn<serde_json::Value>, + timeout: Option<u64>, } impl Client { - /// Creates a new RPC client by passing a Tcp, Unix, or Tls connection. - pub fn new<C: ToConn>(conn: C, config: ClientConfig) -> Self { - let codec_config = CodecConfig { - max_allowed_buffer_size: 0, - ..Default::default() - }; - let codec = Codec::new(conn.to_conn(), codec_config); - Self { codec, config } - } - /// Calls the provided method, waits for the response, and returns the result. pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>( &self, @@ -41,38 +36,122 @@ impl Client { let id = serde_json::json!(random_32()); let request = message::Request { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), id, method: method.to_string(), params: serde_json::json!(params), }; - let mut payload = serde_json::to_vec(&request)?; - payload.push(b'\n'); - self.codec.write_all(&payload).await?; + let req_json = serde_json::to_value(&request)?; + match self.timeout { + Some(s) => { + let dur = Duration::from_secs(s); + timeout(dur, self.conn.send(req_json)).await??; + } + None => { + self.conn.send(req_json).await?; + } + } debug!("--> {request}"); - let mut buffer = vec![]; - if let Some(t) = self.config.timeout { - self.codec.read_until_with_timeout(&mut buffer, t).await?; - } else { - self.codec.read_until(&mut buffer).await?; - }; - - let response = serde_json::from_slice::<message::Response>(&buffer)?; + let msg = self.conn.recv().await?; + let response = serde_json::from_value::<message::Response>(msg)?; debug!("<-- {response}"); - if let Some(error) = response.error { - return Err(Error::CallError(error.code, error.message)); - } - if response.id.is_none() || response.id.unwrap() != request.id { return Err(Error::InvalidMsg("Invalid response id")); } + if let Some(error) = response.error { + return Err(Error::CallError(error.code, error.message)); + } + match response.result { Some(result) => Ok(serde_json::from_value::<V>(result)?), None => Err(Error::InvalidMsg("Invalid response result")), } } } + +pub struct ClientBuilder { + endpoint: Endpoint, + tls_config: Option<(rustls::ClientConfig, String)>, + timeout: Option<u64>, +} + +impl ClientBuilder { + pub fn with_timeout(mut self, timeout: u64) -> Self { + self.timeout = Some(timeout); + self + } + + pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some((config, dns_name.to_string())); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result<Client> { + let conn: Conn<serde_json::Value> = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::tls::dial( + &self.endpoint, + ClientTlsConfig { + dns_name, + client_config: conf, + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match self.tls_config { + Some((conf, dns_name)) => Box::new( + karyon_net::ws::dial( + &self.endpoint, + ClientWsConfig { + tcp_config: Default::default(), + wss_config: Some(ClientWssConfig { + dns_name, + client_config: conf, + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::dial(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new( + karyon_net::unix::dial(&self.endpoint, Default::default(), JsonCodec {}).await?, + ), + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + Ok(Client { + timeout: self.timeout, + conn, + }) + } +} +impl Client { + pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> { + let endpoint = endpoint.to_endpoint()?; + Ok(ClientBuilder { + endpoint, + timeout: None, + tls_config: None, + }) + } +} diff --git a/jsonrpc/src/codec.rs b/jsonrpc/src/codec.rs index 4a70412..74415c7 100644 --- a/jsonrpc/src/codec.rs +++ b/jsonrpc/src/codec.rs @@ -1,100 +1,73 @@ -use memchr::memchr; +use async_tungstenite::tungstenite::Message; -use karyon_core::async_util::timeout; -use karyon_net::Conn; +use karyon_net::{ + codec::{Codec, Decoder, Encoder, WebSocketCodec, WebSocketDecoder, WebSocketEncoder}, + Error, Result, +}; -use crate::{Error, Result}; - -const DEFAULT_BUFFER_SIZE: usize = 1024; -const DEFAULT_MAX_ALLOWED_BUFFER_SIZE: usize = 1024 * 1024; // 1MB - -// TODO: Add unit tests for Codec's functions. - -/// Represents Codec config #[derive(Clone)] -pub struct CodecConfig { - pub default_buffer_size: usize, - /// The maximum allowed buffer size to receive a message. If set to zero, - /// there will be no size limit. - pub max_allowed_buffer_size: usize, -} - -impl Default for CodecConfig { - fn default() -> Self { - Self { - default_buffer_size: DEFAULT_BUFFER_SIZE, - max_allowed_buffer_size: DEFAULT_MAX_ALLOWED_BUFFER_SIZE, - } - } -} +pub struct JsonCodec {} -pub struct Codec { - conn: Conn, - config: CodecConfig, +impl Codec for JsonCodec { + type Item = serde_json::Value; } -impl Codec { - /// Creates a new Codec - pub fn new(conn: Conn, config: CodecConfig) -> Self { - Self { conn, config } +impl Encoder for JsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + let buf = msg.as_bytes(); + dst[..buf.len()].copy_from_slice(buf); + Ok(buf.len()) } +} - /// Read all bytes into `buffer` until the `0x0A` byte or EOF is - /// reached. - /// - /// If successful, this function will return the total number of bytes read. - pub async fn read_until(&self, buffer: &mut Vec<u8>) -> Result<usize> { - let delim = b'\n'; - - let mut read = 0; - - loop { - let mut tmp_buf = vec![0; self.config.default_buffer_size]; - let n = self.conn.read(&mut tmp_buf).await?; - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - - match memchr(delim, &tmp_buf) { - Some(i) => { - buffer.extend_from_slice(&tmp_buf[..=i]); - read += i + 1; - break; - } - None => { - buffer.extend_from_slice(&tmp_buf); - read += tmp_buf.len(); - } - } +impl Decoder for JsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + let de = serde_json::Deserializer::from_slice(src); + let mut iter = de.into_iter::<serde_json::Value>(); - if self.config.max_allowed_buffer_size != 0 - && buffer.len() == self.config.max_allowed_buffer_size - { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size", - )); - } - } + let item = match iter.next() { + Some(Ok(item)) => item, + Some(Err(ref e)) if e.is_eof() => return Ok(None), + Some(Err(e)) => return Err(Error::Encode(e.to_string())), + None => return Ok(None), + }; - Ok(read) + Ok(Some((iter.byte_offset(), item))) } +} - /// Writes an entire buffer into the given connection. - pub async fn write_all(&self, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.write(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at(n); - buf = rest; - - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - } +#[derive(Clone)] +pub struct WsJsonCodec {} +impl WebSocketCodec for WsJsonCodec { + type Item = serde_json::Value; +} - Ok(()) +impl WebSocketEncoder for WsJsonCodec { + type EnItem = serde_json::Value; + fn encode(&self, src: &Self::EnItem) -> Result<Message> { + let msg = match serde_json::to_string(src) { + Ok(m) => m, + Err(err) => return Err(Error::Encode(err.to_string())), + }; + Ok(Message::Text(msg)) } +} - pub async fn read_until_with_timeout(&self, buffer: &mut Vec<u8>, t: u64) -> Result<usize> { - timeout(std::time::Duration::from_secs(t), self.read_until(buffer)).await? +impl WebSocketDecoder for WsJsonCodec { + type DeItem = serde_json::Value; + fn decode(&self, src: &Message) -> Result<Self::DeItem> { + match src { + Message::Text(s) => match serde_json::from_str(s) { + Ok(m) => Ok(m), + Err(err) => Err(Error::Decode(err.to_string())), + }, + _ => Err(Error::Decode("Receive wrong message".to_string())), + } } } diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index 3e0eb8f..1410a62 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -1,5 +1,12 @@ //! A fast and lightweight async implementation of [JSON-RPC -//! 2.0](https://www.jsonrpc.org/specification), supporting the Tcp and Unix protocols. +//! 2.0](https://www.jsonrpc.org/specification). +//! +//! features: +//! - Supports TCP, TLS, WebSocket, and Unix protocols. +//! - Uses smol(async-std) as the async runtime, but also supports tokio via +//! the `tokio` feature. +//! - Allows registration of multiple services (structs) of different types on a +//! single server. //! //! # Example //! @@ -9,69 +16,65 @@ //! use serde_json::Value; //! use smol::net::{TcpStream, TcpListener}; //! -//! use karyon_jsonrpc::{JsonRPCError, Server, Client, register_service, ServerConfig, ClientConfig}; +//! use karyon_jsonrpc::{Error, Server, Client, rpc_impl}; //! //! struct HelloWorld {} //! +//! #[rpc_impl] //! impl HelloWorld { -//! async fn say_hello(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn say_hello(&self, params: Value) -> Result<Value, Error> { //! let msg: String = serde_json::from_value(params)?; //! Ok(serde_json::json!(format!("Hello {msg}!"))) //! } //! -//! async fn foo(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn foo(&self, params: Value) -> Result<Value, Error> { //! Ok(serde_json::json!("foo!")) //! } //! -//! async fn bar(&self, params: Value) -> Result<Value, JsonRPCError> { +//! async fn bar(&self, params: Value) -> Result<Value, Error> { //! Ok(serde_json::json!("bar!")) //! } //! } //! //! // Server //! async { -//! let ex = Arc::new(smol::Executor::new()); -//! //! // Creates a new server -//! let listener = TcpListener::bind("127.0.0.1:60000").await.unwrap(); -//! let config = ServerConfig::default(); -//! let server = Server::new(listener, config, ex.clone()); -//! -//! // Register the HelloWorld service -//! register_service!(HelloWorld, say_hello, foo, bar); -//! server.attach_service(HelloWorld{}); +//! let server = Server::builder("tcp://127.0.0.1:60000") +//! .expect("create new server builder") +//! .service(HelloWorld{}) +//! .build() +//! .await +//! .expect("build the server"); //! //! // Starts the server -//! ex.run(server.start()); +//! server.start().await.expect("start the server"); //! }; //! //! // Client //! async { -//! //! // Creates a new client -//! let conn = TcpStream::connect("127.0.0.1:60000").await.unwrap(); -//! let config = ClientConfig::default(); -//! let client = Client::new(conn, config); -//! -//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()).await.unwrap(); +//! let client = Client::builder("tcp://127.0.0.1:60000") +//! .expect("create new client builder") +//! .build() +//! .await +//! .expect("build the client"); +//! +//! let result: String = client.call("HelloWorld.say_hello", "world".to_string()) +//! .await +//! .expect("send a request"); //! }; //! //! ``` mod client; mod codec; -mod error; pub mod message; mod server; -mod service; -pub use client::{Client, ClientConfig}; -pub use codec::CodecConfig; -pub use error::Error as JsonRPCError; -pub use server::{Server, ServerConfig}; -pub use service::{RPCMethod, RPCService}; +pub use client::Client; +pub use server::Server; +pub use karyon_jsonrpc_internal::{impl_rpc_service, RPCMethod, RPCService}; +pub use karyon_jsonrpc_internal::{Error, Result}; +pub use karyon_jsonrpc_macro::rpc_impl; pub use karyon_net::Endpoint; - -const JSONRPC_VERSION: &str = "2.0"; -use error::{Error, Result}; diff --git a/jsonrpc/src/message.rs b/jsonrpc/src/message.rs index 89ef613..f4bf490 100644 --- a/jsonrpc/src/message.rs +++ b/jsonrpc/src/message.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +pub const JSONRPC_VERSION: &str = "2.0"; + /// Parse error: Invalid JSON was received by the server. pub const PARSE_ERROR_CODE: i32 = -32700; diff --git a/jsonrpc/src/server.rs b/jsonrpc/src/server.rs index 26d632a..1cc7e1f 100644 --- a/jsonrpc/src/server.rs +++ b/jsonrpc/src/server.rs @@ -1,17 +1,20 @@ use std::{collections::HashMap, sync::Arc}; use log::{debug, error, warn}; -use smol::lock::RwLock; -use karyon_core::async_util::{Executor, TaskGroup, TaskResult}; +#[cfg(feature = "smol")] +use futures_rustls::rustls; +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; -use karyon_net::{Conn, Listener, ToListener}; +use karyon_core::async_runtime::Executor; +use karyon_core::async_util::{TaskGroup, TaskResult}; + +use karyon_net::{Conn, Endpoint, Listener, ToEndpoint}; use crate::{ - codec::{Codec, CodecConfig}, - message, - service::RPCService, - Endpoint, Error, Result, JSONRPC_VERSION, + codec::{JsonCodec, WsJsonCodec}, + message, Error, RPCService, Result, }; pub const INVALID_REQUEST_ERROR_MSG: &str = "Invalid request"; @@ -27,69 +30,50 @@ fn pack_err_res(code: i32, msg: &str, id: Option<serde_json::Value>) -> message: }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: Some(err), result: None, id, } } -/// RPC server config -#[derive(Default)] -pub struct ServerConfig { - codec_config: CodecConfig, -} - /// Represents an RPC server -pub struct Server<'a> { - listener: Listener, - services: RwLock<HashMap<String, Box<dyn RPCService + 'a>>>, - task_group: TaskGroup<'a>, - config: ServerConfig, +pub struct Server { + listener: Listener<serde_json::Value>, + task_group: TaskGroup, + services: HashMap<String, Box<dyn RPCService + 'static>>, } -impl<'a> Server<'a> { - /// Creates a new RPC server by passing a listener. It supports Tcp, Unix, and Tls. - pub fn new<T: ToListener>(listener: T, config: ServerConfig, ex: Executor<'a>) -> Arc<Self> { - Arc::new(Self { - listener: listener.to_listener(), - services: RwLock::new(HashMap::new()), - task_group: TaskGroup::with_executor(ex), - config, - }) - } - +impl Server { /// Returns the local endpoint. pub fn local_endpoint(&self) -> Result<Endpoint> { - self.listener.local_endpoint().map_err(Error::KaryonNet) + self.listener.local_endpoint().map_err(Error::from) } /// Starts the RPC server pub async fn start(self: Arc<Self>) -> Result<()> { loop { - let conn = self.listener.accept().await?; - if let Err(err) = self.handle_conn(conn).await { - error!("Failed to handle a new conn: {err}") + match self.listener.accept().await { + Ok(conn) => { + if let Err(err) = self.handle_conn(conn).await { + error!("Failed to handle a new conn: {err}") + } + } + Err(err) => { + error!("Failed to accept a new conn: {err}") + } } } } - /// Attach a new service to the RPC server - pub async fn attach_service(&self, service: impl RPCService + 'a) { - self.services - .write() - .await - .insert(service.name(), Box::new(service)); - } - /// Shuts down the RPC server pub async fn shutdown(&self) { self.task_group.cancel().await; } /// Handles a new connection - async fn handle_conn(self: &Arc<Self>, conn: Conn) -> Result<()> { - let endpoint = conn.peer_endpoint()?; + async fn handle_conn(self: &Arc<Self>, conn: Conn<serde_json::Value>) -> Result<()> { + let endpoint = conn.peer_endpoint().expect("get peer endpoint"); debug!("Handle a new connection {endpoint}"); let on_failure = |result: TaskResult<Result<()>>| async move { @@ -100,19 +84,15 @@ impl<'a> Server<'a> { } }; - let codec = Codec::new(conn, self.config.codec_config.clone()); - let selfc = self.clone(); self.task_group.spawn( async move { loop { - let mut buffer = vec![]; - codec.read_until(&mut buffer).await?; - let response = selfc.handle_request(&buffer).await; - let mut payload = serde_json::to_vec(&response)?; - payload.push(b'\n'); - codec.write_all(&payload).await?; + let msg = conn.recv().await?; + let response = selfc.handle_request(msg).await; + let response = serde_json::to_value(response)?; debug!("--> {response}"); + conn.send(response).await?; } }, on_failure, @@ -122,14 +102,13 @@ impl<'a> Server<'a> { } /// Handles a request - async fn handle_request(&self, buffer: &[u8]) -> message::Response { - let rpc_msg = match serde_json::from_slice::<message::Request>(buffer) { + async fn handle_request(&self, msg: serde_json::Value) -> message::Response { + let rpc_msg = match serde_json::from_value::<message::Request>(msg) { Ok(m) => m, Err(_) => { return pack_err_res(message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG, None); } }; - debug!("<-- {rpc_msg}"); let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect(); @@ -144,9 +123,7 @@ impl<'a> Server<'a> { let srvc_name = srvc_method[0]; let method_name = srvc_method[1]; - let services = self.services.read().await; - - let service = match services.get(srvc_name) { + let service = match self.services.get(srvc_name) { Some(s) => s, None => { return pack_err_res( @@ -196,10 +173,108 @@ impl<'a> Server<'a> { }; message::Response { - jsonrpc: JSONRPC_VERSION.to_string(), + jsonrpc: message::JSONRPC_VERSION.to_string(), error: None, result: Some(result), id: Some(rpc_msg.id), } } } + +pub struct ServerBuilder { + endpoint: Endpoint, + tls_config: Option<rustls::ServerConfig>, + services: HashMap<String, Box<dyn RPCService + 'static>>, +} + +impl ServerBuilder { + pub fn service(mut self, service: impl RPCService + 'static) -> Self { + self.services.insert(service.name(), Box::new(service)); + self + } + + pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> { + match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => { + self.tls_config = Some(config); + Ok(self) + } + _ => Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + } + } + + pub async fn build(self) -> Result<Arc<Server>> { + self._build(TaskGroup::new()).await + } + + pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> { + self._build(TaskGroup::with_executor(ex)).await + } + + async fn _build(self, task_group: TaskGroup) -> Result<Arc<Server>> { + let listener: Listener<serde_json::Value> = match self.endpoint { + Endpoint::Tcp(..) | Endpoint::Tls(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::tls::listen( + &self.endpoint, + karyon_net::tls::ServerTlsConfig { + server_config: conf.clone(), + tcp_config: Default::default(), + }, + JsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::tcp::listen(&self.endpoint, Default::default(), JsonCodec {}) + .await?, + ), + }, + Endpoint::Ws(..) | Endpoint::Wss(..) => match &self.tls_config { + Some(conf) => Box::new( + karyon_net::ws::listen( + &self.endpoint, + karyon_net::ws::ServerWsConfig { + tcp_config: Default::default(), + wss_config: Some(karyon_net::ws::ServerWssConfig { + server_config: conf.clone(), + }), + }, + WsJsonCodec {}, + ) + .await?, + ), + None => Box::new( + karyon_net::ws::listen(&self.endpoint, Default::default(), WsJsonCodec {}) + .await?, + ), + }, + Endpoint::Unix(..) => Box::new(karyon_net::unix::listen( + &self.endpoint, + Default::default(), + JsonCodec {}, + )?), + + _ => return Err(Error::UnsupportedProtocol(self.endpoint.to_string())), + }; + + Ok(Arc::new(Server { + listener, + task_group, + services: self.services, + })) + } +} + +impl ServerBuilder {} + +impl Server { + pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> { + let endpoint = endpoint.to_endpoint()?; + Ok(ServerBuilder { + endpoint, + services: HashMap::new(), + tls_config: None, + }) + } +} diff --git a/jsonrpc/tests/impl_rpc_service.rs b/jsonrpc/tests/impl_rpc_service.rs new file mode 100644 index 0000000..e590ae1 --- /dev/null +++ b/jsonrpc/tests/impl_rpc_service.rs @@ -0,0 +1,27 @@ +use karyon_jsonrpc::{impl_rpc_service, Error, RPCService}; +use serde_json::Value; + +#[test] +fn service() { + struct Foo {} + + impl Foo { + async fn foo(&self, params: Value) -> Result<Value, Error> { + Ok(params) + } + } + + impl_rpc_service!(Foo, foo); + + let f = Foo {}; + + assert!(f.get_method("foo").is_some()); + assert!(f.get_method("bar").is_none()); + + let params = serde_json::json!("params"); + + smol::block_on(async { + let foo_method = f.get_method("foo").unwrap(); + assert_eq!(foo_method(params.clone()).await.unwrap(), params); + }); +} diff --git a/jsonrpc/tests/rpc_impl.rs b/jsonrpc/tests/rpc_impl.rs new file mode 100644 index 0000000..5b14b59 --- /dev/null +++ b/jsonrpc/tests/rpc_impl.rs @@ -0,0 +1,26 @@ +use karyon_jsonrpc::{rpc_impl, Error, RPCService}; +use serde_json::Value; + +#[test] +fn rpc_impl_service() { + struct Foo {} + + #[rpc_impl] + impl Foo { + async fn foo(&self, params: Value) -> Result<Value, Error> { + Ok(params) + } + } + + let f = Foo {}; + + assert!(f.get_method("foo").is_some()); + assert!(f.get_method("bar").is_none()); + + let params = serde_json::json!("params"); + + smol::block_on(async { + let foo_method = f.get_method("foo").unwrap(); + assert_eq!(foo_method(params.clone()).await.unwrap(), params); + }); +} diff --git a/net/Cargo.toml b/net/Cargo.toml index fe209cd..304cbb2 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -1,19 +1,43 @@ [package] name = "karyon_net" -version.workspace = true +version.workspace = true edition.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["smol"] +smol = [ + "karyon_core/smol", + "async-tungstenite/async-std-runtime", + "dep:futures-rustls", +] +tokio = [ + "karyon_core/tokio", + "async-tungstenite/tokio-runtime", + "dep:tokio", + "dep:tokio-rustls", +] + + [dependencies] -karyon_core.workspace = true +karyon_core = { workspace = true, default-features = false } -smol = "2.0.0" +pin-project-lite = "0.2.13" async-trait = "0.1.77" log = "0.4.21" -bincode = { version="2.0.0-rc.3", features = ["derive"]} +bincode = { version = "2.0.0-rc.3", features = ["derive"] } thiserror = "1.0.58" url = "2.5.0" -futures-rustls = "0.25.1" -async-tungstenite = "0.25.0" -ws_stream_tungstenite = "0.13.0" +async-tungstenite = { version = "0.25.0", default-features = false } +asynchronous-codec = "0.7.0" +futures-util = "0.3.30" +async-channel = "2.3.0" +rustls-pki-types = "1.7.0" + +futures-rustls = { version = "0.25.1", optional = true } +tokio-rustls = { version = "0.26.0", optional = true } +tokio = { version = "1.37.0", features = ["io-util"], optional = true } + +[dev-dependencies] +smol = "2.0.0" diff --git a/net/examples/tcp_codec.rs b/net/examples/tcp_codec.rs new file mode 100644 index 0000000..93deaae --- /dev/null +++ b/net/examples/tcp_codec.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use karyon_core::async_util::sleep; + +use karyon_net::{ + codec::{Codec, Decoder, Encoder}, + tcp, ConnListener, Connection, Endpoint, Result, +}; + +#[derive(Clone)] +struct NewLineCodec {} + +impl Codec for NewLineCodec { + type Item = String; +} + +impl Encoder for NewLineCodec { + type EnItem = String; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + dst[..src.len()].copy_from_slice(src.as_bytes()); + Ok(src.len()) + } +} + +impl Decoder for NewLineCodec { + type DeItem = String; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + match src.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some((i + 1, String::from_utf8(src[..i].to_vec()).unwrap()))), + None => Ok(None), + } + } +} + +fn main() { + smol::block_on(async { + let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); + + let config = tcp::TcpConfig::default(); + + let listener = tcp::listen(&endpoint, config.clone(), NewLineCodec {}) + .await + .unwrap(); + smol::spawn(async move { + if let Ok(conn) = listener.accept().await { + loop { + let msg = conn.recv().await.unwrap(); + println!("Receive a message: {:?}", msg); + } + }; + }) + .detach(); + + let conn = tcp::dial(&endpoint, config, NewLineCodec {}).await.unwrap(); + conn.send("hello".to_string()).await.unwrap(); + conn.send(" world\n".to_string()).await.unwrap(); + sleep(Duration::from_secs(1)).await; + }); +} diff --git a/net/src/codec/bytes_codec.rs b/net/src/codec/bytes_codec.rs new file mode 100644 index 0000000..b319e53 --- /dev/null +++ b/net/src/codec/bytes_codec.rs @@ -0,0 +1,29 @@ +use crate::{ + codec::{Codec, Decoder, Encoder}, + Result, +}; + +#[derive(Clone)] +pub struct BytesCodec {} +impl Codec for BytesCodec { + type Item = Vec<u8>; +} + +impl Encoder for BytesCodec { + type EnItem = Vec<u8>; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + dst[..src.len()].copy_from_slice(src); + Ok(src.len()) + } +} + +impl Decoder for BytesCodec { + type DeItem = Vec<u8>; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + if src.is_empty() { + Ok(None) + } else { + Ok(Some((src.len(), src.to_vec()))) + } + } +} diff --git a/net/src/codec/length_codec.rs b/net/src/codec/length_codec.rs new file mode 100644 index 0000000..76a1679 --- /dev/null +++ b/net/src/codec/length_codec.rs @@ -0,0 +1,49 @@ +use karyon_core::util::{decode, encode_into_slice}; + +use crate::{ + codec::{Codec, Decoder, Encoder}, + Result, +}; + +/// The size of the message length. +const MSG_LENGTH_SIZE: usize = std::mem::size_of::<u32>(); + +#[derive(Clone)] +pub struct LengthCodec {} +impl Codec for LengthCodec { + type Item = Vec<u8>; +} + +impl Encoder for LengthCodec { + type EnItem = Vec<u8>; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + let length_buf = &mut [0; MSG_LENGTH_SIZE]; + encode_into_slice(&(src.len() as u32), length_buf)?; + dst[..MSG_LENGTH_SIZE].copy_from_slice(length_buf); + dst[MSG_LENGTH_SIZE..src.len() + MSG_LENGTH_SIZE].copy_from_slice(src); + Ok(src.len() + MSG_LENGTH_SIZE) + } +} + +impl Decoder for LengthCodec { + type DeItem = Vec<u8>; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + if src.len() < MSG_LENGTH_SIZE { + return Ok(None); + } + + let mut length = [0; MSG_LENGTH_SIZE]; + length.copy_from_slice(&src[..MSG_LENGTH_SIZE]); + let (length, _) = decode::<u32>(&length)?; + let length = length as usize; + + if src.len() - MSG_LENGTH_SIZE >= length { + Ok(Some(( + length + MSG_LENGTH_SIZE, + src[MSG_LENGTH_SIZE..length + MSG_LENGTH_SIZE].to_vec(), + ))) + } else { + Ok(None) + } + } +} diff --git a/net/src/codec/mod.rs b/net/src/codec/mod.rs new file mode 100644 index 0000000..565cb07 --- /dev/null +++ b/net/src/codec/mod.rs @@ -0,0 +1,25 @@ +mod bytes_codec; +mod length_codec; +mod websocket; + +pub use bytes_codec::BytesCodec; +pub use length_codec::LengthCodec; +pub use websocket::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder}; + +use crate::Result; + +pub trait Codec: + Decoder<DeItem = Self::Item> + Encoder<EnItem = Self::Item> + Send + Sync + 'static + Unpin +{ + type Item: Send + Sync; +} + +pub trait Encoder { + type EnItem; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize>; +} + +pub trait Decoder { + type DeItem; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>>; +} diff --git a/net/src/codec/websocket.rs b/net/src/codec/websocket.rs new file mode 100644 index 0000000..b59a55c --- /dev/null +++ b/net/src/codec/websocket.rs @@ -0,0 +1,23 @@ +use crate::Result; +use async_tungstenite::tungstenite::Message; + +pub trait WebSocketCodec: + WebSocketDecoder<DeItem = Self::Item> + + WebSocketEncoder<EnItem = Self::Item> + + Send + + Sync + + 'static + + Unpin +{ + type Item: Send + Sync; +} + +pub trait WebSocketEncoder { + type EnItem; + fn encode(&self, src: &Self::EnItem) -> Result<Message>; +} + +pub trait WebSocketDecoder { + type DeItem; + fn decode(&self, src: &Message) -> Result<Self::DeItem>; +} diff --git a/net/src/connection.rs b/net/src/connection.rs index fa4640f..bbd21de 100644 --- a/net/src/connection.rs +++ b/net/src/connection.rs @@ -1,65 +1,34 @@ use async_trait::async_trait; -use crate::{ - transports::{tcp, udp, unix}, - Endpoint, Error, Result, -}; +use crate::{Endpoint, Result}; /// Alias for `Box<dyn Connection>` -pub type Conn = Box<dyn Connection>; +pub type Conn<T> = Box<dyn Connection<Item = T>>; /// A trait for objects which can be converted to [`Conn`]. pub trait ToConn { - fn to_conn(self) -> Conn; + type Item; + fn to_conn(self) -> Conn<Self::Item>; } /// Connection is a generic network connection interface for -/// [`udp::UdpConn`], [`tcp::TcpConn`], and [`unix::UnixConn`]. +/// [`udp::UdpConn`], [`tcp::TcpConn`], [`tls::TlsConn`], [`ws::WsConn`], +/// and [`unix::UnixConn`]. /// /// If you are familiar with the Go language, this is similar to the /// [Conn](https://pkg.go.dev/net#Conn) interface #[async_trait] pub trait Connection: Send + Sync { + type Item; /// Returns the remote peer endpoint of this connection fn peer_endpoint(&self) -> Result<Endpoint>; /// Returns the local socket endpoint of this connection fn local_endpoint(&self) -> Result<Endpoint>; - /// Reads data from this connection. - async fn read(&self, buf: &mut [u8]) -> Result<usize>; + /// Recvs data from this connection. + async fn recv(&self) -> Result<Self::Item>; - /// Writes data to this connection - async fn write(&self, buf: &[u8]) -> Result<usize>; -} - -/// Connects to the provided endpoint. -/// -/// it only supports `tcp4/6`, `udp4/6`, and `unix`. -/// -/// #Example -/// -/// ``` -/// use karyon_net::{Endpoint, dial}; -/// -/// async { -/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); -/// -/// let conn = dial(&endpoint).await.unwrap(); -/// -/// conn.write(b"MSG").await.unwrap(); -/// -/// let mut buffer = [0;32]; -/// conn.read(&mut buffer).await.unwrap(); -/// }; -/// -/// ``` -/// -pub async fn dial(endpoint: &Endpoint) -> Result<Conn> { - match endpoint { - Endpoint::Tcp(_, _) => Ok(Box::new(tcp::dial(endpoint).await?)), - Endpoint::Udp(_, _) => Ok(Box::new(udp::dial(endpoint).await?)), - Endpoint::Unix(addr) => Ok(Box::new(unix::dial(addr).await?)), - _ => Err(Error::InvalidEndpoint(endpoint.to_string())), - } + /// Sends data to this connection + async fn send(&self, msg: Self::Item) -> Result<()>; } diff --git a/net/src/endpoint.rs b/net/src/endpoint.rs index 9193628..0c7ecd1 100644 --- a/net/src/endpoint.rs +++ b/net/src/endpoint.rs @@ -1,10 +1,11 @@ use std::{ net::{IpAddr, SocketAddr}, - os::unix::net::SocketAddr as UnixSocketAddress, path::PathBuf, str::FromStr, }; +use std::os::unix::net::SocketAddr as UnixSocketAddr; + use bincode::{Decode, Encode}; use url::Url; @@ -25,7 +26,7 @@ pub type Port = u16; /// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); /// /// let socketaddr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); -/// let endpoint = Endpoint::new_udp_addr(&socketaddr); +/// let endpoint = Endpoint::new_udp_addr(socketaddr); /// /// ``` /// @@ -35,7 +36,8 @@ pub enum Endpoint { Tcp(Addr, Port), Tls(Addr, Port), Ws(Addr, Port), - Unix(String), + Wss(Addr, Port), + Unix(PathBuf), } impl std::fmt::Display for Endpoint { @@ -53,12 +55,11 @@ impl std::fmt::Display for Endpoint { Endpoint::Ws(ip, port) => { write!(f, "ws://{}:{}", ip, port) } + Endpoint::Wss(ip, port) => { + write!(f, "wss://{}:{}", ip, port) + } Endpoint::Unix(path) => { - if path.is_empty() { - write!(f, "unix:/UNNAMED") - } else { - write!(f, "unix:/{}", path) - } + write!(f, "unix:/{}", path.to_string_lossy()) } } } @@ -71,7 +72,8 @@ impl TryFrom<Endpoint> for SocketAddr { Endpoint::Udp(ip, port) | Endpoint::Tcp(ip, port) | Endpoint::Tls(ip, port) - | Endpoint::Ws(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), + | Endpoint::Ws(ip, port) + | Endpoint::Wss(ip, port) => Ok(SocketAddr::new(ip.try_into()?, port)), Endpoint::Unix(_) => Err(Error::TryFromEndpoint), } } @@ -87,11 +89,11 @@ impl TryFrom<Endpoint> for PathBuf { } } -impl TryFrom<Endpoint> for UnixSocketAddress { +impl TryFrom<Endpoint> for UnixSocketAddr { type Error = Error; - fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddress, Self::Error> { + fn try_from(endpoint: Endpoint) -> std::result::Result<UnixSocketAddr, Self::Error> { match endpoint { - Endpoint::Unix(a) => Ok(UnixSocketAddress::from_pathname(a)?), + Endpoint::Unix(a) => Ok(UnixSocketAddr::from_pathname(a)?), _ => Err(Error::TryFromEndpoint), } } @@ -124,6 +126,7 @@ impl FromStr for Endpoint { "udp" => Ok(Endpoint::Udp(addr, port)), "tls" => Ok(Endpoint::Tls(addr, port)), "ws" => Ok(Endpoint::Ws(addr, port)), + "wss" => Ok(Endpoint::Wss(addr, port)), _ => Err(Error::InvalidEndpoint(s.to_string())), } } else { @@ -132,7 +135,7 @@ impl FromStr for Endpoint { } match url.scheme() { - "unix" => Ok(Endpoint::Unix(url.path().to_string())), + "unix" => Ok(Endpoint::Unix(url.path().into())), _ => Err(Error::InvalidEndpoint(s.to_string())), } } @@ -141,33 +144,33 @@ impl FromStr for Endpoint { impl Endpoint { /// Creates a new TCP endpoint from a `SocketAddr`. - pub fn new_tcp_addr(addr: &SocketAddr) -> Endpoint { + pub fn new_tcp_addr(addr: SocketAddr) -> Endpoint { Endpoint::Tcp(Addr::Ip(addr.ip()), addr.port()) } /// Creates a new UDP endpoint from a `SocketAddr`. - pub fn new_udp_addr(addr: &SocketAddr) -> Endpoint { + pub fn new_udp_addr(addr: SocketAddr) -> Endpoint { Endpoint::Udp(Addr::Ip(addr.ip()), addr.port()) } /// Creates a new TLS endpoint from a `SocketAddr`. - pub fn new_tls_addr(addr: &SocketAddr) -> Endpoint { + pub fn new_tls_addr(addr: SocketAddr) -> Endpoint { Endpoint::Tls(Addr::Ip(addr.ip()), addr.port()) } /// Creates a new WS endpoint from a `SocketAddr`. - pub fn new_ws_addr(addr: &SocketAddr) -> Endpoint { + pub fn new_ws_addr(addr: SocketAddr) -> Endpoint { Endpoint::Ws(Addr::Ip(addr.ip()), addr.port()) } - /// Creates a new Unix endpoint from a `UnixSocketAddress`. - pub fn new_unix_addr(addr: &UnixSocketAddress) -> Endpoint { - Endpoint::Unix( - addr.as_pathname() - .and_then(|a| a.to_str()) - .unwrap_or("") - .to_string(), - ) + /// Creates a new WSS endpoint from a `SocketAddr`. + pub fn new_wss_addr(addr: SocketAddr) -> Endpoint { + Endpoint::Wss(Addr::Ip(addr.ip()), addr.port()) + } + + /// Creates a new Unix endpoint from a `UnixSocketAddr`. + pub fn new_unix_addr(addr: &std::path::Path) -> Endpoint { + Endpoint::Unix(addr.to_path_buf()) } /// Returns the `Port` of the endpoint. @@ -176,7 +179,8 @@ impl Endpoint { Endpoint::Tcp(_, port) | Endpoint::Udp(_, port) | Endpoint::Tls(_, port) - | Endpoint::Ws(_, port) => Ok(port), + | Endpoint::Ws(_, port) + | Endpoint::Wss(_, port) => Ok(port), _ => Err(Error::TryFromEndpoint), } } @@ -187,7 +191,8 @@ impl Endpoint { Endpoint::Tcp(addr, _) | Endpoint::Udp(addr, _) | Endpoint::Tls(addr, _) - | Endpoint::Ws(addr, _) => Ok(addr), + | Endpoint::Ws(addr, _) + | Endpoint::Wss(addr, _) => Ok(addr), _ => Err(Error::TryFromEndpoint), } } @@ -223,10 +228,27 @@ impl std::fmt::Display for Addr { } } +pub trait ToEndpoint { + fn to_endpoint(&self) -> Result<Endpoint>; +} + +impl ToEndpoint for String { + fn to_endpoint(&self) -> Result<Endpoint> { + Endpoint::from_str(self) + } +} + +impl ToEndpoint for &str { + fn to_endpoint(&self) -> Result<Endpoint> { + Endpoint::from_str(self) + } +} + #[cfg(test)] mod tests { use super::*; use std::net::Ipv4Addr; + use std::path::PathBuf; #[test] fn test_endpoint_from_str() { @@ -243,7 +265,7 @@ mod tests { assert_eq!(endpoint_str, endpoint); let endpoint_str = "unix:/home/x/s.socket".parse::<Endpoint>().unwrap(); - let endpoint = Endpoint::Unix("/home/x/s.socket".to_string()); + let endpoint = Endpoint::Unix(PathBuf::from_str("/home/x/s.socket").unwrap()); assert_eq!(endpoint_str, endpoint); } } diff --git a/net/src/error.rs b/net/src/error.rs index 6e04a12..ee93168 100644 --- a/net/src/error.rs +++ b/net/src/error.rs @@ -13,9 +13,18 @@ pub enum Error { #[error("invalid address {0}")] InvalidAddress(String), + #[error("invalid path {0}")] + InvalidPath(String), + #[error("invalid endpoint {0}")] InvalidEndpoint(String), + #[error("Encode error: {0}")] + Encode(String), + + #[error("Decode error: {0}")] + Decode(String), + #[error("Parse endpoint error {0}")] ParseEndpoint(String), @@ -26,23 +35,28 @@ pub enum Error { ChannelSend(String), #[error(transparent)] - ChannelRecv(#[from] smol::channel::RecvError), + ChannelRecv(#[from] async_channel::RecvError), #[error("Ws Error: {0}")] WsError(#[from] async_tungstenite::tungstenite::Error), + #[cfg(feature = "smol")] #[error("Tls Error: {0}")] Rustls(#[from] futures_rustls::rustls::Error), + #[cfg(feature = "tokio")] + #[error("Tls Error: {0}")] + Rustls(#[from] tokio_rustls::rustls::Error), + #[error("Invalid DNS Name: {0}")] - InvalidDnsNameError(#[from] futures_rustls::pki_types::InvalidDnsNameError), + InvalidDnsNameError(#[from] rustls_pki_types::InvalidDnsNameError), #[error(transparent)] - KaryonCore(#[from] karyon_core::error::Error), + KaryonCore(#[from] karyon_core::Error), } -impl<T> From<smol::channel::SendError<T>> for Error { - fn from(error: smol::channel::SendError<T>) -> Self { +impl<T> From<async_channel::SendError<T>> for Error { + fn from(error: async_channel::SendError<T>) -> Self { Error::ChannelSend(error.to_string()) } } diff --git a/net/src/lib.rs b/net/src/lib.rs index c1d72b2..ddb53cf 100644 --- a/net/src/lib.rs +++ b/net/src/lib.rs @@ -1,20 +1,20 @@ +pub mod codec; mod connection; mod endpoint; mod error; mod listener; +mod stream; mod transports; pub use { - connection::{dial, Conn, Connection, ToConn}, - endpoint::{Addr, Endpoint, Port}, - listener::{listen, ConnListener, Listener, ToListener}, + connection::{Conn, Connection, ToConn}, + endpoint::{Addr, Endpoint, Port, ToEndpoint}, + listener::{ConnListener, Listener, ToListener}, transports::{tcp, tls, udp, unix, ws}, }; -use error::{Error, Result}; - /// Represents karyon's Net Error -pub use error::Error as NetError; +pub use error::Error; /// Represents karyon's Net Result -pub use error::Result as NetResult; +pub use error::Result; diff --git a/net/src/listener.rs b/net/src/listener.rs index 4511212..469f5e9 100644 --- a/net/src/listener.rs +++ b/net/src/listener.rs @@ -1,46 +1,21 @@ use async_trait::async_trait; -use crate::{ - transports::{tcp, unix}, - Conn, Endpoint, Error, Result, -}; +use crate::{Conn, Endpoint, Result}; /// Alias for `Box<dyn ConnListener>` -pub type Listener = Box<dyn ConnListener>; +pub type Listener<T> = Box<dyn ConnListener<Item = T>>; /// A trait for objects which can be converted to [`Listener`]. pub trait ToListener { - fn to_listener(self) -> Listener; + type Item; + fn to_listener(self) -> Listener<Self::Item>; } -/// ConnListener is a generic network listener. +/// ConnListener is a generic network listener interface for +/// [`tcp::TcpConn`], [`tls::TlsConn`], [`ws::WsConn`], and [`unix::UnixConn`]. #[async_trait] pub trait ConnListener: Send + Sync { + type Item; fn local_endpoint(&self) -> Result<Endpoint>; - async fn accept(&self) -> Result<Conn>; -} - -/// Listens to the provided endpoint. -/// -/// it only supports `tcp4/6`, and `unix`. -/// -/// #Example -/// -/// ``` -/// use karyon_net::{Endpoint, listen}; -/// -/// async { -/// let endpoint: Endpoint = "tcp://127.0.0.1:3000".parse().unwrap(); -/// -/// let listener = listen(&endpoint).await.unwrap(); -/// let conn = listener.accept().await.unwrap(); -/// }; -/// -/// ``` -pub async fn listen(endpoint: &Endpoint) -> Result<Box<dyn ConnListener>> { - match endpoint { - Endpoint::Tcp(_, _) => Ok(Box::new(tcp::listen(endpoint).await?)), - Endpoint::Unix(addr) => Ok(Box::new(unix::listen(addr)?)), - _ => Err(Error::InvalidEndpoint(endpoint.to_string())), - } + async fn accept(&self) -> Result<Conn<Self::Item>>; } diff --git a/net/src/stream/buffer.rs b/net/src/stream/buffer.rs new file mode 100644 index 0000000..f211600 --- /dev/null +++ b/net/src/stream/buffer.rs @@ -0,0 +1,82 @@ +#[derive(Debug)] +pub struct Buffer<B> { + inner: B, + len: usize, + cap: usize, +} + +impl<B> Buffer<B> +where + B: AsMut<[u8]> + AsRef<[u8]>, +{ + /// Constructs a new, empty Buffer<B>. + pub fn new(b: B) -> Self { + Self { + cap: b.as_ref().len(), + inner: b, + len: 0, + } + } + + /// Returns the number of elements in the buffer. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.len + } + + /// Resizes the buffer in-place so that `len` is equal to `new_size`. + pub fn resize(&mut self, new_size: usize) { + assert!(self.cap > new_size); + self.len = new_size; + } + + /// Appends all elements in a slice to the buffer. + pub fn extend_from_slice(&mut self, bytes: &[u8]) { + let old_len = self.len; + self.resize(self.len + bytes.len()); + self.inner.as_mut()[old_len..bytes.len() + old_len].copy_from_slice(bytes); + } + + /// Shortens the buffer, dropping the first `cnt` bytes and keeping the + /// rest. + pub fn advance(&mut self, cnt: usize) { + assert!(self.len >= cnt); + self.inner.as_mut().rotate_left(cnt); + self.resize(self.len - cnt); + } + + /// Returns `true` if the buffer contains no elements. + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +impl<B> AsMut<[u8]> for Buffer<B> +where + B: AsMut<[u8]> + AsRef<[u8]>, +{ + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[..self.len] + } +} + +impl<B> AsRef<[u8]> for Buffer<B> +where + B: AsMut<[u8]> + AsRef<[u8]>, +{ + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[..self.len] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_buffer_advance() { + let mut buf = Buffer::new([0u8; 32]); + buf.extend_from_slice(&[1, 2, 3]); + assert_eq!([1, 2, 3], buf.as_ref()); + } +} diff --git a/net/src/stream/mod.rs b/net/src/stream/mod.rs new file mode 100644 index 0000000..9493b29 --- /dev/null +++ b/net/src/stream/mod.rs @@ -0,0 +1,191 @@ +mod buffer; +mod websocket; + +pub use websocket::WsStream; + +use std::{ + io::ErrorKind, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{ + ready, + stream::{Stream, StreamExt}, + Sink, +}; +use pin_project_lite::pin_project; + +use karyon_core::async_runtime::io::{AsyncRead, AsyncWrite}; + +use crate::{ + codec::{Decoder, Encoder}, + Error, Result, +}; + +use buffer::Buffer; + +const BUFFER_SIZE: usize = 2048 * 2024; // 4MB +const INITIAL_BUFFER_SIZE: usize = 1024 * 1024; // 1MB + +pub struct ReadStream<T, C> { + inner: T, + decoder: C, + buffer: Buffer<[u8; BUFFER_SIZE]>, +} + +impl<T, C> ReadStream<T, C> +where + T: AsyncRead + Unpin, + C: Decoder + Unpin, +{ + pub fn new(inner: T, decoder: C) -> Self { + Self { + inner, + decoder, + buffer: Buffer::new([0u8; BUFFER_SIZE]), + } + } + + pub async fn recv(&mut self) -> Result<C::DeItem> { + match self.next().await { + Some(m) => m, + None => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())), + } + } +} + +pin_project! { + pub struct WriteStream<T, C> { + #[pin] + inner: T, + encoder: C, + high_water_mark: usize, + buffer: Buffer<[u8; BUFFER_SIZE]>, + } +} + +impl<T, C> WriteStream<T, C> +where + T: AsyncWrite + Unpin, + C: Encoder + Unpin, +{ + pub fn new(inner: T, encoder: C) -> Self { + Self { + inner, + encoder, + high_water_mark: 131072, + buffer: Buffer::new([0u8; BUFFER_SIZE]), + } + } +} + +impl<T, C> Stream for ReadStream<T, C> +where + T: AsyncRead + Unpin, + C: Decoder + Unpin, +{ + type Item = Result<C::DeItem>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = &mut *self; + + if let Some((n, item)) = this.decoder.decode(this.buffer.as_mut())? { + this.buffer.advance(n); + return Poll::Ready(Some(Ok(item))); + } + + let mut buf = [0u8; INITIAL_BUFFER_SIZE]; + #[cfg(feature = "tokio")] + let mut buf = tokio::io::ReadBuf::new(&mut buf); + + loop { + #[cfg(feature = "smol")] + let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buf))?; + #[cfg(feature = "smol")] + let bytes = &buf[..n]; + + #[cfg(feature = "tokio")] + ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buf))?; + #[cfg(feature = "tokio")] + let bytes = buf.filled(); + #[cfg(feature = "tokio")] + let n = bytes.len(); + + this.buffer.extend_from_slice(bytes); + + match this.decoder.decode(this.buffer.as_mut())? { + Some((cn, item)) => { + this.buffer.advance(cn); + return Poll::Ready(Some(Ok(item))); + } + None if n == 0 => { + if this.buffer.is_empty() { + return Poll::Ready(None); + } else { + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "bytes remaining in read stream", + ) + .into()))); + } + } + _ => continue, + } + } + } +} + +impl<T, C> Sink<C::EnItem> for WriteStream<T, C> +where + T: AsyncWrite + Unpin, + C: Encoder + Unpin, +{ + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { + let this = &mut *self; + while !this.buffer.is_empty() { + let n = ready!(Pin::new(&mut this.inner).poll_write(cx, this.buffer.as_ref()))?; + + if n == 0 { + return Poll::Ready(Err(std::io::Error::new( + ErrorKind::UnexpectedEof, + "End of file", + ) + .into())); + } + + this.buffer.advance(n); + } + + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: C::EnItem) -> Result<()> { + let this = &mut *self; + let mut buf = [0u8; INITIAL_BUFFER_SIZE]; + let n = this.encoder.encode(&item, &mut buf)?; + this.buffer.extend_from_slice(&buf[..n]); + Ok(()) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<std::result::Result<(), Self::Error>> { + ready!(self.as_mut().poll_ready(cx))?; + self.project().inner.poll_flush(cx).map_err(Into::into) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<std::result::Result<(), Self::Error>> { + ready!(self.as_mut().poll_flush(cx))?; + #[cfg(feature = "smol")] + return self.project().inner.poll_close(cx).map_err(Error::from); + #[cfg(feature = "tokio")] + return self.project().inner.poll_shutdown(cx).map_err(Error::from); + } +} diff --git a/net/src/stream/websocket.rs b/net/src/stream/websocket.rs new file mode 100644 index 0000000..2552eaf --- /dev/null +++ b/net/src/stream/websocket.rs @@ -0,0 +1,107 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use async_tungstenite::tungstenite::Message; +use futures_util::{Sink, SinkExt, Stream, StreamExt}; + +#[cfg(feature = "smol")] +use futures_rustls::TlsStream; +#[cfg(feature = "tokio")] +use tokio_rustls::TlsStream; + +use karyon_core::async_runtime::net::TcpStream; + +use crate::{codec::WebSocketCodec, Error, Result}; + +#[cfg(feature = "tokio")] +type WebSocketStream<T> = + async_tungstenite::WebSocketStream<async_tungstenite::tokio::TokioAdapter<T>>; +#[cfg(feature = "smol")] +use async_tungstenite::WebSocketStream; + +pub struct WsStream<C> { + inner: InnerWSConn, + codec: C, +} + +impl<C> WsStream<C> +where + C: WebSocketCodec, +{ + pub fn new_ws(conn: WebSocketStream<TcpStream>, codec: C) -> Self { + Self { + inner: InnerWSConn::Plain(conn), + codec, + } + } + + pub fn new_wss(conn: WebSocketStream<TlsStream<TcpStream>>, codec: C) -> Self { + Self { + inner: InnerWSConn::Tls(conn), + codec, + } + } + + pub async fn recv(&mut self) -> Result<C::Item> { + match self.inner.next().await { + Some(msg) => self.codec.decode(&msg?), + None => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())), + } + } + + pub async fn send(&mut self, msg: C::Item) -> Result<()> { + let ws_msg = self.codec.encode(&msg)?; + self.inner.send(ws_msg).await + } +} + +enum InnerWSConn { + Plain(WebSocketStream<TcpStream>), + Tls(WebSocketStream<TlsStream<TcpStream>>), +} + +impl Sink<Message> for InnerWSConn { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + match &mut *self { + InnerWSConn::Plain(s) => Pin::new(s).poll_ready(cx).map_err(Error::from), + InnerWSConn::Tls(s) => Pin::new(s).poll_ready(cx).map_err(Error::from), + } + } + + fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<()> { + match &mut *self { + InnerWSConn::Plain(s) => Pin::new(s).start_send(item).map_err(Error::from), + InnerWSConn::Tls(s) => Pin::new(s).start_send(item).map_err(Error::from), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + match &mut *self { + InnerWSConn::Plain(s) => Pin::new(s).poll_flush(cx).map_err(Error::from), + InnerWSConn::Tls(s) => Pin::new(s).poll_flush(cx).map_err(Error::from), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + match &mut *self { + InnerWSConn::Plain(s) => Pin::new(s).poll_close(cx).map_err(Error::from), + InnerWSConn::Tls(s) => Pin::new(s).poll_close(cx).map_err(Error::from), + } + .map_err(Error::from) + } +} + +impl Stream for InnerWSConn { + type Item = Result<Message>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match &mut *self { + InnerWSConn::Plain(s) => Pin::new(s).poll_next(cx).map_err(Error::from), + InnerWSConn::Tls(s) => Pin::new(s).poll_next(cx).map_err(Error::from), + } + } +} diff --git a/net/src/transports/tcp.rs b/net/src/transports/tcp.rs index 21fce3d..03c8ab2 100644 --- a/net/src/transports/tcp.rs +++ b/net/src/transports/tcp.rs @@ -1,116 +1,184 @@ use std::net::SocketAddr; use async_trait::async_trait; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use futures_util::SinkExt; + +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, - net::{TcpListener, TcpStream}, + net::{TcpListener as AsyncTcpListener, TcpStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, - Error, Result, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, + Result, }; -/// TCP network connection implementation of the [`Connection`] trait. -pub struct TcpConn { - inner: TcpStream, - read: Mutex<ReadHalf<TcpStream>>, - write: Mutex<WriteHalf<TcpStream>>, +/// TCP configuration +#[derive(Clone)] +pub struct TcpConfig { + pub nodelay: bool, +} + +impl Default for TcpConfig { + fn default() -> Self { + Self { nodelay: true } + } +} + +/// TCP connection implementation of the [`Connection`] trait. +pub struct TcpConn<C> { + read_stream: Mutex<ReadStream<ReadHalf<TcpStream>, C>>, + write_stream: Mutex<WriteStream<WriteHalf<TcpStream>, C>>, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, } -impl TcpConn { +impl<C> TcpConn<C> +where + C: Codec + Clone, +{ /// Creates a new TcpConn - pub fn new(conn: TcpStream) -> Self { - let (read, write) = split(conn.clone()); + pub fn new( + socket: TcpStream, + codec: C, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, + ) -> Self { + let (read, write) = split(socket); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for TcpConn { +impl<C> Connection for TcpConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.inner.peer_addr()?)) + Ok(self.peer_endpoint.clone()) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.inner.local_addr()?)) + Ok(self.local_endpoint.clone()) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await + } +} + +pub struct TcpListener<C> { + inner: AsyncTcpListener, + config: TcpConfig, + codec: C, +} + +impl<C> TcpListener<C> +where + C: Codec, +{ + pub fn new(listener: AsyncTcpListener, config: TcpConfig, codec: C) -> Self { + Self { + inner: listener, + config: config.clone(), + codec, + } } } #[async_trait] -impl ConnListener for TcpListener { +impl<C> ConnListener for TcpListener<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tcp_addr(&self.local_addr()?)) + Ok(Endpoint::new_tcp_addr(self.inner.local_addr()?)) } - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (conn, _) = self.accept().await?; - conn.set_nodelay(true)?; - Ok(Box::new(TcpConn::new(conn))) + async fn accept(&self) -> Result<Conn<C::Item>> { + let (socket, _) = self.inner.accept().await?; + socket.set_nodelay(self.config.nodelay)?; + + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tcp_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tcp_addr)?; + + Ok(Box::new(TcpConn::new( + socket, + self.codec.clone(), + peer_endpoint, + local_endpoint, + ))) } } /// Connects to the given TCP address and port. -pub async fn dial(endpoint: &Endpoint) -> Result<TcpConn> { +pub async fn dial<C>(endpoint: &Endpoint, config: TcpConfig, codec: C) -> Result<TcpConn<C>> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let conn = TcpStream::connect(addr).await?; - conn.set_nodelay(true)?; - Ok(TcpConn::new(conn)) + let socket = TcpStream::connect(addr).await?; + socket.set_nodelay(config.nodelay)?; + + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tcp_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tcp_addr)?; + + Ok(TcpConn::new(socket, codec, peer_endpoint, local_endpoint)) } /// Listens on the given TCP address and port. -pub async fn listen(endpoint: &Endpoint) -> Result<TcpListener> { +pub async fn listen<C>(endpoint: &Endpoint, config: TcpConfig, codec: C) -> Result<TcpListener<C>> +where + C: Codec, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let listener = TcpListener::bind(addr).await?; - Ok(listener) -} - -impl From<TcpStream> for Box<dyn Connection> { - fn from(conn: TcpStream) -> Self { - Box::new(TcpConn::new(conn)) - } + let listener = AsyncTcpListener::bind(addr).await?; + Ok(TcpListener::new(listener, config, codec)) } -impl From<TcpListener> for Box<dyn ConnListener> { - fn from(listener: TcpListener) -> Self { +impl<C> From<TcpListener<C>> for Box<dyn ConnListener<Item = C::Item>> +where + C: Clone + Codec, +{ + fn from(listener: TcpListener<C>) -> Self { Box::new(listener) } } -impl ToConn for TcpStream { - fn to_conn(self) -> Box<dyn Connection> { - self.into() - } -} - -impl ToConn for TcpConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for TcpConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } -impl ToListener for TcpListener { - fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for TcpListener<C> +where + C: Clone + Codec, +{ + type Item = C::Item; + fn to_listener(self) -> Listener<Self::Item> { self.into() } } diff --git a/net/src/transports/tls.rs b/net/src/transports/tls.rs index 476f495..c972f63 100644 --- a/net/src/transports/tls.rs +++ b/net/src/transports/tls.rs @@ -1,138 +1,218 @@ use std::{net::SocketAddr, sync::Arc}; use async_trait::async_trait; -use futures_rustls::{pki_types, rustls, TlsAcceptor, TlsConnector, TlsStream}; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use futures_util::SinkExt; +use rustls_pki_types as pki_types; + +#[cfg(feature = "smol")] +use futures_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream}; +#[cfg(feature = "tokio")] +use tokio_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream}; + +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, net::{TcpListener, TcpStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, - Error, Result, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, + Result, }; +use super::tcp::TcpConfig; + +/// TLS configuration +#[derive(Clone)] +pub struct ServerTlsConfig { + pub tcp_config: TcpConfig, + pub server_config: rustls::ServerConfig, +} + +#[derive(Clone)] +pub struct ClientTlsConfig { + pub tcp_config: TcpConfig, + pub client_config: rustls::ClientConfig, + pub dns_name: String, +} + /// TLS network connection implementation of the [`Connection`] trait. -pub struct TlsConn { - inner: TcpStream, - read: Mutex<ReadHalf<TlsStream<TcpStream>>>, - write: Mutex<WriteHalf<TlsStream<TcpStream>>>, +pub struct TlsConn<C> { + read_stream: Mutex<ReadStream<ReadHalf<TlsStream<TcpStream>>, C>>, + write_stream: Mutex<WriteStream<WriteHalf<TlsStream<TcpStream>>, C>>, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, } -impl TlsConn { +impl<C> TlsConn<C> +where + C: Codec + Clone, +{ /// Creates a new TlsConn - pub fn new(sock: TcpStream, conn: TlsStream<TcpStream>) -> Self { + pub fn new( + conn: TlsStream<TcpStream>, + codec: C, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, + ) -> Self { let (read, write) = split(conn); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: sock, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for TlsConn { +impl<C> Connection for TlsConn<C> +where + C: Clone + Codec, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tls_addr(&self.inner.peer_addr()?)) + Ok(self.peer_endpoint.clone()) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tls_addr(&self.inner.local_addr()?)) + Ok(self.local_endpoint.clone()) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await } } /// Connects to the given TLS address and port. -pub async fn dial( - endpoint: &Endpoint, - config: rustls::ClientConfig, - dns_name: &'static str, -) -> Result<TlsConn> { +pub async fn dial<C>(endpoint: &Endpoint, config: ClientTlsConfig, codec: C) -> Result<TlsConn<C>> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let connector = TlsConnector::from(Arc::new(config)); + let connector = TlsConnector::from(Arc::new(config.client_config.clone())); + + let socket = TcpStream::connect(addr).await?; + socket.set_nodelay(config.tcp_config.nodelay)?; - let sock = TcpStream::connect(addr).await?; - sock.set_nodelay(true)?; + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tls_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tls_addr)?; - let altname = pki_types::ServerName::try_from(dns_name)?; - let conn = connector.connect(altname, sock.clone()).await?; - Ok(TlsConn::new(sock, TlsStream::Client(conn))) + let altname = pki_types::ServerName::try_from(config.dns_name.clone())?; + let conn = connector.connect(altname, socket).await?; + Ok(TlsConn::new( + TlsStream::Client(conn), + codec, + peer_endpoint, + local_endpoint, + )) } /// Tls network listener implementation of the `Listener` [`ConnListener`] trait. -pub struct TlsListener { +pub struct TlsListener<C> { inner: TcpListener, acceptor: TlsAcceptor, + config: ServerTlsConfig, + codec: C, +} + +impl<C> TlsListener<C> +where + C: Codec + Clone, +{ + pub fn new( + acceptor: TlsAcceptor, + listener: TcpListener, + config: ServerTlsConfig, + codec: C, + ) -> Self { + Self { + inner: listener, + acceptor, + config: config.clone(), + codec, + } + } } #[async_trait] -impl ConnListener for TlsListener { +impl<C> ConnListener for TlsListener<C> +where + C: Clone + Codec, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_tls_addr(&self.inner.local_addr()?)) + Ok(Endpoint::new_tls_addr(self.inner.local_addr()?)) } - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (sock, _) = self.inner.accept().await?; - sock.set_nodelay(true)?; - let conn = self.acceptor.accept(sock.clone()).await?; - Ok(Box::new(TlsConn::new(sock, TlsStream::Server(conn)))) + async fn accept(&self) -> Result<Conn<C::Item>> { + let (socket, _) = self.inner.accept().await?; + socket.set_nodelay(self.config.tcp_config.nodelay)?; + + let peer_endpoint = socket.peer_addr().map(Endpoint::new_tls_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_tls_addr)?; + + let conn = self.acceptor.accept(socket).await?; + Ok(Box::new(TlsConn::new( + TlsStream::Server(conn), + self.codec.clone(), + peer_endpoint, + local_endpoint, + ))) } } /// Listens on the given TLS address and port. -pub async fn listen(endpoint: &Endpoint, config: rustls::ServerConfig) -> Result<TlsListener> { +pub async fn listen<C>( + endpoint: &Endpoint, + config: ServerTlsConfig, + codec: C, +) -> Result<TlsListener<C>> +where + C: Clone + Codec, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let acceptor = TlsAcceptor::from(Arc::new(config)); + let acceptor = TlsAcceptor::from(Arc::new(config.server_config.clone())); let listener = TcpListener::bind(addr).await?; - Ok(TlsListener { - acceptor, - inner: listener, - }) -} - -impl From<TlsStream<TcpStream>> for Box<dyn Connection> { - fn from(conn: TlsStream<TcpStream>) -> Self { - Box::new(TlsConn::new(conn.get_ref().0.clone(), conn)) - } + Ok(TlsListener::new(acceptor, listener, config, codec)) } -impl From<TlsListener> for Box<dyn ConnListener> { - fn from(listener: TlsListener) -> Self { +impl<C> From<TlsListener<C>> for Listener<C::Item> +where + C: Codec + Clone, +{ + fn from(listener: TlsListener<C>) -> Self { Box::new(listener) } } -impl ToConn for TlsStream<TcpStream> { - fn to_conn(self) -> Box<dyn Connection> { - self.into() - } -} - -impl ToConn for TlsConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for TlsConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } -impl ToListener for TlsListener { - fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for TlsListener<C> +where + C: Clone + Codec, +{ + type Item = C::Item; + fn to_listener(self) -> Listener<Self::Item> { self.into() } } diff --git a/net/src/transports/udp.rs b/net/src/transports/udp.rs index bd1fe83..950537c 100644 --- a/net/src/transports/udp.rs +++ b/net/src/transports/udp.rs @@ -1,93 +1,111 @@ use std::net::SocketAddr; use async_trait::async_trait; -use smol::net::UdpSocket; +use karyon_core::async_runtime::net::UdpSocket; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, Error, Result, }; +const BUFFER_SIZE: usize = 64 * 1024; + +/// UDP configuration +#[derive(Default)] +pub struct UdpConfig {} + /// UDP network connection implementation of the [`Connection`] trait. -pub struct UdpConn { +#[allow(dead_code)] +pub struct UdpConn<C> { inner: UdpSocket, + codec: C, + config: UdpConfig, } -impl UdpConn { +impl<C> UdpConn<C> +where + C: Codec + Clone, +{ /// Creates a new UdpConn - pub fn new(conn: UdpSocket) -> Self { - Self { inner: conn } - } -} - -impl UdpConn { - /// Receives a single datagram message. Returns the number of bytes read - /// and the origin endpoint. - pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Endpoint)> { - let (size, addr) = self.inner.recv_from(buf).await?; - Ok((size, Endpoint::new_udp_addr(&addr))) - } - - /// Sends data to the given address. Returns the number of bytes written. - pub async fn send_to(&self, buf: &[u8], addr: &Endpoint) -> Result<usize> { - let addr: SocketAddr = addr.clone().try_into()?; - let size = self.inner.send_to(buf, addr).await?; - Ok(size) + fn new(socket: UdpSocket, config: UdpConfig, codec: C) -> Self { + Self { + inner: socket, + codec, + config, + } } } #[async_trait] -impl Connection for UdpConn { +impl<C> Connection for UdpConn<C> +where + C: Codec + Clone, +{ + type Item = (C::Item, Endpoint); fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_udp_addr(&self.inner.peer_addr()?)) + self.inner + .peer_addr() + .map(Endpoint::new_udp_addr) + .map_err(Error::from) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_udp_addr(&self.inner.local_addr()?)) + self.inner + .local_addr() + .map(Endpoint::new_udp_addr) + .map_err(Error::from) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.inner.recv(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + let mut buf = [0u8; BUFFER_SIZE]; + let (_, addr) = self.inner.recv_from(&mut buf).await?; + match self.codec.decode(&mut buf)? { + Some((_, item)) => Ok((item, Endpoint::new_udp_addr(addr))), + None => Err(Error::Decode("Unable to decode".into())), + } } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.inner.send(buf).await.map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + let (msg, out_addr) = msg; + let mut buf = [0u8; BUFFER_SIZE]; + self.codec.encode(&msg, &mut buf)?; + let addr: SocketAddr = out_addr.try_into()?; + self.inner.send_to(&buf, addr).await?; + Ok(()) } } /// Connects to the given UDP address and port. -pub async fn dial(endpoint: &Endpoint) -> Result<UdpConn> { +pub async fn dial<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; // Let the operating system assign an available port to this socket let conn = UdpSocket::bind("[::]:0").await?; conn.connect(addr).await?; - Ok(UdpConn::new(conn)) + Ok(UdpConn::new(conn, config, codec)) } /// Listens on the given UDP address and port. -pub async fn listen(endpoint: &Endpoint) -> Result<UdpConn> { +pub async fn listen<C>(endpoint: &Endpoint, config: UdpConfig, codec: C) -> Result<UdpConn<C>> +where + C: Codec + Clone, +{ let addr = SocketAddr::try_from(endpoint.clone())?; let conn = UdpSocket::bind(addr).await?; - let udp_conn = UdpConn::new(conn); - Ok(udp_conn) -} - -impl From<UdpSocket> for Box<dyn Connection> { - fn from(conn: UdpSocket) -> Self { - Box::new(UdpConn::new(conn)) - } -} - -impl ToConn for UdpSocket { - fn to_conn(self) -> Box<dyn Connection> { - self.into() - } + Ok(UdpConn::new(conn, config, codec)) } -impl ToConn for UdpConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for UdpConn<C> +where + C: Codec + Clone, +{ + type Item = (C::Item, Endpoint); + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } diff --git a/net/src/transports/unix.rs b/net/src/transports/unix.rs index 494e104..bafebaf 100644 --- a/net/src/transports/unix.rs +++ b/net/src/transports/unix.rs @@ -1,111 +1,192 @@ use async_trait::async_trait; +use futures_util::SinkExt; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use karyon_core::async_runtime::{ + io::{split, ReadHalf, WriteHalf}, lock::Mutex, - net::unix::{UnixListener, UnixStream}, + net::{UnixListener as AsyncUnixListener, UnixStream}, }; use crate::{ - connection::{Connection, ToConn}, + codec::Codec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, + listener::{ConnListener, Listener, ToListener}, + stream::{ReadStream, WriteStream}, Error, Result, }; +/// Unix Conn config +#[derive(Clone, Default)] +pub struct UnixConfig {} + /// Unix domain socket implementation of the [`Connection`] trait. -pub struct UnixConn { - inner: UnixStream, - read: Mutex<ReadHalf<UnixStream>>, - write: Mutex<WriteHalf<UnixStream>>, +pub struct UnixConn<C> { + read_stream: Mutex<ReadStream<ReadHalf<UnixStream>, C>>, + write_stream: Mutex<WriteStream<WriteHalf<UnixStream>, C>>, + peer_endpoint: Option<Endpoint>, + local_endpoint: Option<Endpoint>, } -impl UnixConn { - /// Creates a new UnixConn - pub fn new(conn: UnixStream) -> Self { - let (read, write) = split(conn.clone()); +impl<C> UnixConn<C> +where + C: Codec + Clone, +{ + /// Creates a new TcpConn + pub fn new(conn: UnixStream, codec: C) -> Self { + let peer_endpoint = conn + .peer_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + let local_endpoint = conn + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .ok(); + + let (read, write) = split(conn); + let read_stream = Mutex::new(ReadStream::new(read, codec.clone())); + let write_stream = Mutex::new(WriteStream::new(write, codec)); Self { - inner: conn, - read: Mutex::new(read), - write: Mutex::new(write), + read_stream, + write_stream, + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for UnixConn { +impl<C> Connection for UnixConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.peer_addr()?)) + self.peer_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.inner.local_addr()?)) + self.local_endpoint + .clone() + .ok_or(Error::IO(std::io::ErrorKind::AddrNotAvailable.into())) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + self.read_stream.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.write_stream.lock().await.send(msg).await + } +} + +#[allow(dead_code)] +pub struct UnixListener<C> { + inner: AsyncUnixListener, + config: UnixConfig, + codec: C, +} + +impl<C> UnixListener<C> +where + C: Codec + Clone, +{ + pub fn new(listener: AsyncUnixListener, config: UnixConfig, codec: C) -> Self { + Self { + inner: listener, + config, + codec, + } } } #[async_trait] -impl ConnListener for UnixListener { +impl<C> ConnListener for UnixListener<C> +where + C: Codec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_unix_addr(&self.local_addr()?)) + self.inner + .local_addr() + .and_then(|a| { + Ok(Endpoint::new_unix_addr( + a.as_pathname() + .ok_or(std::io::ErrorKind::AddrNotAvailable)?, + )) + }) + .map_err(Error::from) } - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (conn, _) = self.accept().await?; - Ok(Box::new(UnixConn::new(conn))) + async fn accept(&self) -> Result<Conn<C::Item>> { + let (conn, _) = self.inner.accept().await?; + Ok(Box::new(UnixConn::new(conn, self.codec.clone()))) } } /// Connects to the given Unix socket path. -pub async fn dial(path: &String) -> Result<UnixConn> { +pub async fn dial<C>(endpoint: &Endpoint, _config: UnixConfig, codec: C) -> Result<UnixConn<C>> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; let conn = UnixStream::connect(path).await?; - Ok(UnixConn::new(conn)) + Ok(UnixConn::new(conn, codec)) } /// Listens on the given Unix socket path. -pub fn listen(path: &String) -> Result<UnixListener> { - let listener = UnixListener::bind(path)?; - Ok(listener) -} - -impl From<UnixStream> for Box<dyn Connection> { - fn from(conn: UnixStream) -> Self { - Box::new(UnixConn::new(conn)) - } +pub fn listen<C>(endpoint: &Endpoint, config: UnixConfig, codec: C) -> Result<UnixListener<C>> +where + C: Codec + Clone, +{ + let path: std::path::PathBuf = endpoint.clone().try_into()?; + let listener = AsyncUnixListener::bind(path)?; + Ok(UnixListener::new(listener, config, codec)) } -impl From<UnixListener> for Box<dyn ConnListener> { - fn from(listener: UnixListener) -> Self { +// impl From<UnixStream> for Box<dyn Connection> { +// fn from(conn: UnixStream) -> Self { +// Box::new(UnixConn::new(conn)) +// } +// } + +impl<C> From<UnixListener<C>> for Listener<C::Item> +where + C: Codec + Clone, +{ + fn from(listener: UnixListener<C>) -> Self { Box::new(listener) } } -impl ToConn for UnixStream { - fn to_conn(self) -> Box<dyn Connection> { - self.into() - } -} - -impl ToConn for UnixConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for UnixConn<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } -impl ToListener for UnixListener { - fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for UnixListener<C> +where + C: Codec + Clone, +{ + type Item = C::Item; + fn to_listener(self) -> Listener<Self::Item> { self.into() } } diff --git a/net/src/transports/ws.rs b/net/src/transports/ws.rs index eaf3b9b..17fe924 100644 --- a/net/src/transports/ws.rs +++ b/net/src/transports/ws.rs @@ -1,112 +1,254 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use async_trait::async_trait; -use smol::{ - io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, +use rustls_pki_types as pki_types; + +#[cfg(feature = "tokio")] +use async_tungstenite::tokio as async_tungstenite; + +#[cfg(feature = "smol")] +use futures_rustls::{rustls, TlsAcceptor, TlsConnector}; +#[cfg(feature = "tokio")] +use tokio_rustls::{rustls, TlsAcceptor, TlsConnector}; + +use karyon_core::async_runtime::{ lock::Mutex, net::{TcpListener, TcpStream}, }; -use ws_stream_tungstenite::WsStream; - use crate::{ - connection::{Connection, ToConn}, + codec::WebSocketCodec, + connection::{Conn, Connection, ToConn}, endpoint::Endpoint, - listener::{ConnListener, ToListener}, - Error, Result, + listener::{ConnListener, Listener, ToListener}, + stream::WsStream, + Result, }; +use super::tcp::TcpConfig; + +/// WSS configuration +#[derive(Clone)] +pub struct ServerWssConfig { + pub server_config: rustls::ServerConfig, +} + +/// WSS configuration +#[derive(Clone)] +pub struct ClientWssConfig { + pub client_config: rustls::ClientConfig, + pub dns_name: String, +} + +/// WS configuration +#[derive(Clone, Default)] +pub struct ServerWsConfig { + pub tcp_config: TcpConfig, + pub wss_config: Option<ServerWssConfig>, +} + +/// WS configuration +#[derive(Clone, Default)] +pub struct ClientWsConfig { + pub tcp_config: TcpConfig, + pub wss_config: Option<ClientWssConfig>, +} + /// WS network connection implementation of the [`Connection`] trait. -pub struct WsConn { - inner: TcpStream, - read: Mutex<ReadHalf<WsStream<TcpStream>>>, - write: Mutex<WriteHalf<WsStream<TcpStream>>>, +pub struct WsConn<C> { + // XXX: remove mutex + inner: Mutex<WsStream<C>>, + peer_endpoint: Endpoint, + local_endpoint: Endpoint, } -impl WsConn { +impl<C> WsConn<C> +where + C: WebSocketCodec, +{ /// Creates a new WsConn - pub fn new(inner: TcpStream, conn: WsStream<TcpStream>) -> Self { - let (read, write) = split(conn); + pub fn new(ws: WsStream<C>, peer_endpoint: Endpoint, local_endpoint: Endpoint) -> Self { Self { - inner, - read: Mutex::new(read), - write: Mutex::new(write), + inner: Mutex::new(ws), + peer_endpoint, + local_endpoint, } } } #[async_trait] -impl Connection for WsConn { +impl<C> Connection for WsConn<C> +where + C: WebSocketCodec, +{ + type Item = C::Item; fn peer_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_ws_addr(&self.inner.peer_addr()?)) + Ok(self.peer_endpoint.clone()) } fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?)) + Ok(self.local_endpoint.clone()) } - async fn read(&self, buf: &mut [u8]) -> Result<usize> { - self.read.lock().await.read(buf).await.map_err(Error::from) + async fn recv(&self) -> Result<Self::Item> { + self.inner.lock().await.recv().await } - async fn write(&self, buf: &[u8]) -> Result<usize> { - self.write - .lock() - .await - .write(buf) - .await - .map_err(Error::from) + async fn send(&self, msg: Self::Item) -> Result<()> { + self.inner.lock().await.send(msg).await } } /// Ws network listener implementation of the `Listener` [`ConnListener`] trait. -pub struct WsListener { +pub struct WsListener<C> { inner: TcpListener, + config: ServerWsConfig, + codec: C, + tls_acceptor: Option<TlsAcceptor>, } #[async_trait] -impl ConnListener for WsListener { +impl<C> ConnListener for WsListener<C> +where + C: WebSocketCodec + Clone, +{ + type Item = C::Item; fn local_endpoint(&self) -> Result<Endpoint> { - Ok(Endpoint::new_ws_addr(&self.inner.local_addr()?)) + match self.config.wss_config { + Some(_) => Ok(Endpoint::new_wss_addr(self.inner.local_addr()?)), + None => Ok(Endpoint::new_ws_addr(self.inner.local_addr()?)), + } } - async fn accept(&self) -> Result<Box<dyn Connection>> { - let (stream, _) = self.inner.accept().await?; - let conn = async_tungstenite::accept_async(stream.clone()).await?; - Ok(Box::new(WsConn::new(stream, WsStream::new(conn)))) + async fn accept(&self) -> Result<Conn<Self::Item>> { + let (socket, _) = self.inner.accept().await?; + socket.set_nodelay(self.config.tcp_config.nodelay)?; + + match &self.config.wss_config { + Some(_) => match &self.tls_acceptor { + Some(acceptor) => { + let peer_endpoint = socket.peer_addr().map(Endpoint::new_wss_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_wss_addr)?; + + let tls_conn = acceptor.accept(socket).await?.into(); + let conn = async_tungstenite::accept_async(tls_conn).await?; + Ok(Box::new(WsConn::new( + WsStream::new_wss(conn, self.codec.clone()), + peer_endpoint, + local_endpoint, + ))) + } + None => unreachable!(), + }, + None => { + let peer_endpoint = socket.peer_addr().map(Endpoint::new_ws_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_ws_addr)?; + + let conn = async_tungstenite::accept_async(socket).await?; + + Ok(Box::new(WsConn::new( + WsStream::new_ws(conn, self.codec.clone()), + peer_endpoint, + local_endpoint, + ))) + } + } } } /// Connects to the given WS address and port. -pub async fn dial(endpoint: &Endpoint) -> Result<WsConn> { +pub async fn dial<C>(endpoint: &Endpoint, config: ClientWsConfig, codec: C) -> Result<WsConn<C>> +where + C: WebSocketCodec, +{ let addr = SocketAddr::try_from(endpoint.clone())?; - let stream = TcpStream::connect(addr).await?; - let (conn, _resp) = - async_tungstenite::client_async(endpoint.to_string(), stream.clone()).await?; - Ok(WsConn::new(stream, WsStream::new(conn))) + let socket = TcpStream::connect(addr).await?; + socket.set_nodelay(config.tcp_config.nodelay)?; + + match &config.wss_config { + Some(conf) => { + let peer_endpoint = socket.peer_addr().map(Endpoint::new_wss_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_wss_addr)?; + + let connector = TlsConnector::from(Arc::new(conf.client_config.clone())); + + let altname = pki_types::ServerName::try_from(conf.dns_name.clone())?; + let tls_conn = connector.connect(altname, socket).await?.into(); + let (conn, _resp) = + async_tungstenite::client_async(endpoint.to_string(), tls_conn).await?; + Ok(WsConn::new( + WsStream::new_wss(conn, codec), + peer_endpoint, + local_endpoint, + )) + } + None => { + let peer_endpoint = socket.peer_addr().map(Endpoint::new_ws_addr)?; + let local_endpoint = socket.local_addr().map(Endpoint::new_ws_addr)?; + let (conn, _resp) = + async_tungstenite::client_async(endpoint.to_string(), socket).await?; + Ok(WsConn::new( + WsStream::new_ws(conn, codec), + peer_endpoint, + local_endpoint, + )) + } + } } /// Listens on the given WS address and port. -pub async fn listen(endpoint: &Endpoint) -> Result<WsListener> { +pub async fn listen<C>( + endpoint: &Endpoint, + config: ServerWsConfig, + codec: C, +) -> Result<WsListener<C>> { let addr = SocketAddr::try_from(endpoint.clone())?; + let listener = TcpListener::bind(addr).await?; - Ok(WsListener { inner: listener }) + match &config.wss_config { + Some(conf) => { + let acceptor = TlsAcceptor::from(Arc::new(conf.server_config.clone())); + Ok(WsListener { + inner: listener, + config, + codec, + tls_acceptor: Some(acceptor), + }) + } + None => Ok(WsListener { + inner: listener, + config, + codec, + tls_acceptor: None, + }), + } } -impl From<WsListener> for Box<dyn ConnListener> { - fn from(listener: WsListener) -> Self { +impl<C> From<WsListener<C>> for Listener<C::Item> +where + C: WebSocketCodec + Clone, +{ + fn from(listener: WsListener<C>) -> Self { Box::new(listener) } } -impl ToConn for WsConn { - fn to_conn(self) -> Box<dyn Connection> { +impl<C> ToConn for WsConn<C> +where + C: WebSocketCodec, +{ + type Item = C::Item; + fn to_conn(self) -> Conn<Self::Item> { Box::new(self) } } -impl ToListener for WsListener { - fn to_listener(self) -> Box<dyn ConnListener> { +impl<C> ToListener for WsListener<C> +where + C: WebSocketCodec + Clone, +{ + type Item = C::Item; + fn to_listener(self) -> Listener<Self::Item> { self.into() } } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index fc14de2..3327810 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -1,42 +1,43 @@ [package] name = "karyon_p2p" -version.workspace = true +version.workspace = true edition.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["smol"] +smol = ["karyon_core/smol", "karyon_net/smol", "dep:futures-rustls"] +tokio = ["karyon_core/tokio", "karyon_net/tokio", "dep:tokio-rustls"] + [dependencies] -karyon_core = { workspace = true, features=["crypto"] } -karyon_net.workspace = true +karyon_core = { workspace = true, features = [ + "crypto", +], default-features = false } +karyon_net = { workspace = true, default-features = false } -smol = "2.0.0" async-trait = "0.1.77" -futures-util = {version = "0.3.5", features=["std"], default-features = false } +async-channel = "2.3.0" +futures-util = { version = "0.3.5", features = [ + "std", +], default-features = false } log = "0.4.21" chrono = "0.4.35" -bincode = { version="2.0.0-rc.3", features = ["derive"]} +bincode = { version = "2.0.0-rc.3", features = ["derive"] } rand = "0.8.5" thiserror = "1.0.58" semver = "1.0.22" sha2 = "0.10.8" # tls -futures-rustls = { version = "0.25.1", features = ["aws-lc-rs"] } rcgen = "0.12.1" yasna = "0.5.2" x509-parser = "0.16.0" - -[[example]] -name = "peer" -path = "examples/peer.rs" - -[[example]] -name = "chat" -path = "examples/chat.rs" - -[[example]] -name = "monitor" -path = "examples/monitor.rs" +futures-rustls = { version = "0.25.1", features = [ + "aws-lc-rs", +], optional = true } +tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true } +rustls-pki-types = "1.7.0" [dev-dependencies] async-std = "1.12.0" @@ -44,3 +45,4 @@ clap = { version = "4.5.2", features = ["derive"] } ctrlc = "3.4.4" easy-parallel = "3.3.1" env_logger = "0.11.3" +smol = "2.0.0" diff --git a/p2p/README.md b/p2p/README.md index e00d9e5..768fd19 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -1,6 +1,6 @@ # karyon p2p -karyon p2p serves as the foundational stack for the karyon project. It offers +karyon p2p serves as the foundational stack for the Karyon project. It offers a lightweight, extensible, and customizable peer-to-peer (p2p) network stack that seamlessly integrates with any p2p project. @@ -130,7 +130,14 @@ boolean `enable_tls` field in the configuration. However, implementing TLS for a P2P network is not trivial and is still unstable, requiring a comprehensive audit. -## Usage + +## Choosing the async runtime + +karyon p2p currently supports both smol(async-std) and tokio. The default is +smol, but if you want to use tokio, you need to disable the default features +and then select the `tokio` feature. + +## Examples You can check out the examples [here](./examples). diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index cc822d9..4eafb07 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -121,7 +121,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(&key_pair, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone().into()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); @@ -133,7 +133,7 @@ fn main() { let username = cli.username; // Attach the ChatProtocol - let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone()); + let c = move |peer| ChatProtocol::new(&username, peer, ex_cloned.clone().into()); backend.attach_protocol::<ChatProtocol>(c).await.unwrap(); // Run the backend diff --git a/p2p/examples/monitor.rs b/p2p/examples/monitor.rs index b074352..019f751 100644 --- a/p2p/examples/monitor.rs +++ b/p2p/examples/monitor.rs @@ -51,7 +51,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(&key_pair, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone().into()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index 06586b6..db747c9 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -51,7 +51,7 @@ fn main() { let ex = Arc::new(Executor::new()); // Create a new Backend - let backend = Backend::new(&key_pair, config, ex.clone()); + let backend = Backend::new(&key_pair, config, ex.clone().into()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); let handle = move || ctrlc_s.try_send(()).unwrap(); diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs index 8065e63..57d89ef 100644 --- a/p2p/examples/shared/mod.rs +++ b/p2p/examples/shared/mod.rs @@ -1,9 +1,7 @@ -use std::{num::NonZeroUsize, thread}; +use std::{num::NonZeroUsize, sync::Arc, thread}; use easy_parallel::Parallel; -use smol::{channel, future, future::Future}; - -use karyon_core::async_util::Executor; +use smol::{channel, future, future::Future, Executor}; /// Returns an estimate of the default amount of parallelism a program should use. /// see `std::thread::available_parallelism` @@ -14,7 +12,7 @@ fn available_parallelism() -> usize { } /// Run a multi-threaded executor -pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Executor<'_>) { +pub fn run_executor<T>(main_future: impl Future<Output = T>, ex: Arc<Executor<'_>>) { let (signal, shutdown) = channel::unbounded::<()>(); let num_threads = available_parallelism(); diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index d33f3dc..2f21b3e 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use log::info; -use karyon_core::{async_util::Executor, crypto::KeyPair, pubsub::Subscription}; +use karyon_core::{async_runtime::Executor, crypto::KeyPair, pubsub::Subscription}; use crate::{ config::Config, @@ -37,7 +37,7 @@ pub struct Backend { impl Backend { /// Creates a new Backend. - pub fn new(key_pair: &KeyPair, config: Config, ex: Executor<'static>) -> ArcBackend { + pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> ArcBackend { let config = Arc::new(config); let monitor = Arc::new(Monitor::new()); let conn_queue = ConnQueue::new(); diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 726a2f7..3d0f323 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -1,120 +1,69 @@ -use std::time::Duration; +use karyon_core::util::{decode, encode, encode_into_slice}; -use bincode::{Decode, Encode}; - -use karyon_core::{ - async_util::timeout, - util::{decode, encode, encode_into_slice}, -}; - -use karyon_net::{Connection, NetError}; - -use crate::{ - message::{NetMsg, NetMsgCmd, NetMsgHeader, MAX_ALLOWED_MSG_SIZE, MSG_HEADER_SIZE}, - Error, Result, +use karyon_net::{ + codec::{Codec, Decoder, Encoder, LengthCodec}, + Result, }; -pub trait CodecMsg: Decode + Encode + std::fmt::Debug {} -impl<T: Encode + Decode + std::fmt::Debug> CodecMsg for T {} +use crate::message::{NetMsg, RefreshMsg}; -/// A Codec working with generic network connections. -/// -/// It is responsible for both decoding data received from the network and -/// encoding data before sending it. -pub struct Codec { - conn: Box<dyn Connection>, +#[derive(Clone)] +pub struct NetMsgCodec { + inner_codec: LengthCodec, } -impl Codec { - /// Creates a new Codec. - pub fn new(conn: Box<dyn Connection>) -> Self { - Self { conn } - } - - /// Reads a message of type `NetMsg` from the connection. - /// - /// It reads the first 6 bytes as the header of the message, then reads - /// and decodes the remaining message data based on the determined header. - pub async fn read(&self) -> Result<NetMsg> { - // Read 6 bytes to get the header of the incoming message - let mut buf = [0; MSG_HEADER_SIZE]; - self.read_exact(&mut buf).await?; - - // Decode the header from bytes to NetMsgHeader - let (header, _) = decode::<NetMsgHeader>(&buf)?; - - if header.payload_size > MAX_ALLOWED_MSG_SIZE { - return Err(Error::InvalidMsg( - "Message exceeds the maximum allowed size".to_string(), - )); +impl NetMsgCodec { + pub fn new() -> Self { + Self { + inner_codec: LengthCodec {}, } - - // Create a buffer to hold the message based on its length - let mut payload = vec![0; header.payload_size as usize]; - self.read_exact(&mut payload).await?; - - Ok(NetMsg { header, payload }) } +} - /// Writes a message of type `T` to the connection. - /// - /// Before appending the actual message payload, it calculates the length of - /// the encoded message in bytes and appends this length to the message header. - pub async fn write<T: CodecMsg>(&self, command: NetMsgCmd, msg: &T) -> Result<()> { - let payload = encode(msg)?; - - // Create a buffer to hold the message header (6 bytes) - let header_buf = &mut [0; MSG_HEADER_SIZE]; - let header = NetMsgHeader { - command, - payload_size: payload.len() as u32, - }; - encode_into_slice(&header, header_buf)?; - - let mut buffer = vec![]; - // Append the header bytes to the buffer - buffer.extend_from_slice(header_buf); - // Append the message payload to the buffer - buffer.extend_from_slice(&payload); - - self.write_all(&buffer).await?; - Ok(()) - } +impl Codec for NetMsgCodec { + type Item = NetMsg; +} - /// Reads a message of type `NetMsg` with the given timeout. - pub async fn read_timeout(&self, duration: Duration) -> Result<NetMsg> { - timeout(duration, self.read()) - .await - .map_err(|_| NetError::Timeout)? +impl Encoder for NetMsgCodec { + type EnItem = NetMsg; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + let src = encode(src)?; + self.inner_codec.encode(&src, dst) } +} - /// Reads the exact number of bytes required to fill `buf`. - async fn read_exact(&self, mut buf: &mut [u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.read(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at_mut(n); - buf = rest; - - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); +impl Decoder for NetMsgCodec { + type DeItem = NetMsg; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + match self.inner_codec.decode(src)? { + Some((n, s)) => { + let (m, _) = decode::<Self::DeItem>(&s)?; + Ok(Some((n, m))) } + None => Ok(None), } - - Ok(()) } +} - /// Writes an entire buffer into the connection. - async fn write_all(&self, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let n = self.conn.write(buf).await?; - let (_, rest) = std::mem::take(&mut buf).split_at(n); - buf = rest; +#[derive(Clone)] +pub struct RefreshMsgCodec {} - if n == 0 { - return Err(Error::IO(std::io::ErrorKind::UnexpectedEof.into())); - } - } +impl Codec for RefreshMsgCodec { + type Item = RefreshMsg; +} + +impl Encoder for RefreshMsgCodec { + type EnItem = RefreshMsg; + fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> { + let n = encode_into_slice(src, dst)?; + Ok(n) + } +} - Ok(()) +impl Decoder for RefreshMsgCodec { + type DeItem = RefreshMsg; + fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> { + let (m, n) = decode::<Self::DeItem>(src)?; + Ok(Some((n, m))) } } diff --git a/p2p/src/connection.rs b/p2p/src/connection.rs index 9fa57cb..9a153f3 100644 --- a/p2p/src/connection.rs +++ b/p2p/src/connection.rs @@ -1,11 +1,11 @@ use std::{collections::VecDeque, fmt, sync::Arc}; -use smol::{channel::Sender, lock::Mutex}; +use async_channel::Sender; -use karyon_core::async_util::CondVar; +use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar}; use karyon_net::Conn; -use crate::Result; +use crate::{message::NetMsg, Result}; /// Defines the direction of a network connection. #[derive(Clone, Debug)] @@ -25,7 +25,7 @@ impl fmt::Display for ConnDirection { pub struct NewConn { pub direction: ConnDirection, - pub conn: Conn, + pub conn: Conn<NetMsg>, pub disconnect_signal: Sender<Result<()>>, } @@ -44,8 +44,8 @@ impl ConnQueue { } /// Push a connection into the queue and wait for the disconnect signal - pub async fn handle(&self, conn: Conn, direction: ConnDirection) -> Result<()> { - let (disconnect_signal, chan) = smol::channel::bounded(1); + pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> { + let (disconnect_signal, chan) = async_channel::bounded(1); let new_conn = NewConn { direction, conn, diff --git a/p2p/src/connector.rs b/p2p/src/connector.rs index de9e746..aea21ab 100644 --- a/p2p/src/connector.rs +++ b/p2p/src/connector.rs @@ -3,12 +3,15 @@ use std::{future::Future, sync::Arc}; use log::{error, trace, warn}; use karyon_core::{ - async_util::{Backoff, Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; -use karyon_net::{tcp, tls, Conn, Endpoint, NetError}; +use karyon_net::{tcp, tls, Conn, Endpoint, Error as NetError}; use crate::{ + codec::NetMsgCodec, + message::NetMsg, monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, tls_config::tls_client_config, @@ -23,7 +26,7 @@ pub struct Connector { key_pair: KeyPair, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Manages available outbound slots. connection_slots: Arc<ConnectionSlots>, @@ -47,7 +50,7 @@ impl Connector { connection_slots: Arc<ConnectionSlots>, enable_tls: bool, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> Arc<Self> { Arc::new(Self { key_pair: key_pair.clone(), @@ -70,7 +73,11 @@ impl Connector { /// `Conn` instance. /// /// This method will block until it finds an available slot. - pub async fn connect(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> { + pub async fn connect( + &self, + endpoint: &Endpoint, + peer_id: &Option<PeerID>, + ) -> Result<Conn<NetMsg>> { self.connection_slots.wait_for_slot().await; self.connection_slots.add(); @@ -113,7 +120,7 @@ impl Connector { self: &Arc<Self>, endpoint: &Endpoint, peer_id: &Option<PeerID>, - callback: impl FnOnce(Conn) -> Fut + Send + 'static, + callback: impl FnOnce(Conn<NetMsg>) -> Fut + Send + 'static, ) -> Result<()> where Fut: Future<Output = Result<()>> + Send + 'static, @@ -138,14 +145,20 @@ impl Connector { Ok(()) } - async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn> { + async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> { if self.enable_tls { - let tls_config = tls_client_config(&self.key_pair, peer_id.clone())?; - tls::dial(endpoint, tls_config, DNS_NAME) + let tls_config = tls::ClientTlsConfig { + tcp_config: Default::default(), + client_config: tls_client_config(&self.key_pair, peer_id.clone())?, + dns_name: DNS_NAME.to_string(), + }; + tls::dial(endpoint, tls_config, NetMsgCodec::new()) .await - .map(|l| Box::new(l) as Conn) + .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) } else { - tcp::dial(endpoint).await.map(|l| Box::new(l) as Conn) + tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) + .await + .map(|l| Box::new(l) as karyon_net::Conn<NetMsg>) } .map_err(Error::KaryonNet) } diff --git a/p2p/src/discovery/lookup.rs b/p2p/src/discovery/lookup.rs index c81fbc6..cff4610 100644 --- a/p2p/src/discovery/lookup.rs +++ b/p2p/src/discovery/lookup.rs @@ -3,10 +3,13 @@ use std::{sync::Arc, time::Duration}; use futures_util::{stream::FuturesUnordered, StreamExt}; use log::{error, trace}; use rand::{rngs::OsRng, seq::SliceRandom, RngCore}; -use smol::lock::{Mutex, RwLock}; use karyon_core::{ - async_util::{timeout, Executor}, + async_runtime::{ + lock::{Mutex, RwLock}, + Executor, + }, + async_util::timeout, crypto::KeyPair, util::decode, }; @@ -14,7 +17,6 @@ use karyon_core::{ use karyon_net::{Conn, Endpoint}; use crate::{ - codec::Codec, connector::Connector, listener::Listener, message::{ @@ -64,7 +66,7 @@ impl LookupService { table: Arc<Mutex<RoutingTable>>, config: Arc<Config>, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> Self { let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots)); @@ -228,8 +230,7 @@ impl LookupService { target_peer_id: &PeerID, ) -> Result<Vec<PeerMsg>> { let conn = self.connector.connect(&endpoint, &peer_id).await?; - let io_codec = Codec::new(conn); - let result = self.handle_outbound(io_codec, target_peer_id).await; + let result = self.handle_outbound(conn, target_peer_id).await; self.monitor .notify(&ConnEvent::Disconnected(endpoint).into()) @@ -242,14 +243,14 @@ impl LookupService { /// Handles outbound connection async fn handle_outbound( &self, - io_codec: Codec, + conn: Conn<NetMsg>, target_peer_id: &PeerID, ) -> Result<Vec<PeerMsg>> { trace!("Send Ping msg"); - self.send_ping_msg(&io_codec).await?; + self.send_ping_msg(&conn).await?; trace!("Send FindPeer msg"); - let peers = self.send_findpeer_msg(&io_codec, target_peer_id).await?; + let peers = self.send_findpeer_msg(&conn, target_peer_id).await?; if peers.0.len() >= MAX_PEERS_IN_PEERSMSG { return Err(Error::Lookup("Received too many peers in PeersMsg")); @@ -257,12 +258,12 @@ impl LookupService { trace!("Send Peer msg"); if let Some(endpoint) = &self.listen_endpoint { - self.send_peer_msg(&io_codec, endpoint.read().await.clone()) + self.send_peer_msg(&conn, endpoint.read().await.clone()) .await?; } trace!("Send Shutdown msg"); - self.send_shutdown_msg(&io_codec).await?; + self.send_shutdown_msg(&conn).await?; Ok(peers.0) } @@ -277,7 +278,7 @@ impl LookupService { let endpoint = Endpoint::Tcp(addr, self.config.discovery_port); let selfc = self.clone(); - let callback = |conn: Conn| async move { + let callback = |conn: Conn<NetMsg>| async move { let t = Duration::from_secs(selfc.config.lookup_connection_lifespan); timeout(t, selfc.handle_inbound(conn)).await??; Ok(()) @@ -288,10 +289,9 @@ impl LookupService { } /// Handles inbound connection - async fn handle_inbound(self: &Arc<Self>, conn: Conn) -> Result<()> { - let io_codec = Codec::new(conn); + async fn handle_inbound(self: &Arc<Self>, conn: Conn<NetMsg>) -> Result<()> { loop { - let msg: NetMsg = io_codec.read().await?; + let msg: NetMsg = conn.recv().await?; trace!("Receive msg {:?}", msg.header.command); if let NetMsgCmd::Shutdown = msg.header.command { @@ -304,12 +304,12 @@ impl LookupService { if !version_match(&self.config.version.req, &ping_msg.version) { return Err(Error::IncompatibleVersion("system: {}".into())); } - self.send_pong_msg(ping_msg.nonce, &io_codec).await?; + self.send_pong_msg(ping_msg.nonce, &conn).await?; } NetMsgCmd::FindPeer => { let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?; let peer_id = findpeer_msg.0; - self.send_peers_msg(&peer_id, &io_codec).await?; + self.send_peers_msg(&peer_id, &conn).await?; } NetMsgCmd::Peer => { let (peer, _) = decode::<PeerMsg>(&msg.payload)?; @@ -322,7 +322,7 @@ impl LookupService { } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, io_codec: &Codec) -> Result<()> { + async fn send_ping_msg(&self, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Pong msg"); let mut nonce: [u8; 32] = [0; 32]; @@ -332,10 +332,10 @@ impl LookupService { version: self.config.version.v.clone(), nonce, }; - io_codec.write(NetMsgCmd::Ping, &ping_msg).await?; + conn.send(NetMsg::new(NetMsgCmd::Ping, &ping_msg)?).await?; let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + let recv_msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Pong, recv_msg); let (pong_msg, _) = decode::<PongMsg>(&payload)?; @@ -348,21 +348,24 @@ impl LookupService { } /// Sends a Pong msg - async fn send_pong_msg(&self, nonce: [u8; 32], io_codec: &Codec) -> Result<()> { + async fn send_pong_msg(&self, nonce: [u8; 32], conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Pong msg"); - io_codec.write(NetMsgCmd::Pong, &PongMsg(nonce)).await?; + conn.send(NetMsg::new(NetMsgCmd::Pong, &PongMsg(nonce))?) + .await?; Ok(()) } /// Sends a FindPeer msg and wait to receivet the Peers msg. - async fn send_findpeer_msg(&self, io_codec: &Codec, peer_id: &PeerID) -> Result<PeersMsg> { + async fn send_findpeer_msg(&self, conn: &Conn<NetMsg>, peer_id: &PeerID) -> Result<PeersMsg> { trace!("Send FindPeer msg"); - io_codec - .write(NetMsgCmd::FindPeer, &FindPeerMsg(peer_id.clone())) - .await?; + conn.send(NetMsg::new( + NetMsgCmd::FindPeer, + &FindPeerMsg(peer_id.clone()), + )?) + .await?; let t = Duration::from_secs(self.config.lookup_response_timeout); - let recv_msg: NetMsg = io_codec.read_timeout(t).await?; + let recv_msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Peers, recv_msg); let (peers, _) = decode(&payload)?; @@ -371,19 +374,20 @@ impl LookupService { } /// Sends a Peers msg. - async fn send_peers_msg(&self, peer_id: &PeerID, io_codec: &Codec) -> Result<()> { + async fn send_peers_msg(&self, peer_id: &PeerID, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Peers msg"); let table = self.table.lock().await; let entries = table.closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG); drop(table); let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect(); - io_codec.write(NetMsgCmd::Peers, &PeersMsg(peers)).await?; + conn.send(NetMsg::new(NetMsgCmd::Peers, &PeersMsg(peers))?) + .await?; Ok(()) } /// Sends a Peer msg. - async fn send_peer_msg(&self, io_codec: &Codec, endpoint: Endpoint) -> Result<()> { + async fn send_peer_msg(&self, conn: &Conn<NetMsg>, endpoint: Endpoint) -> Result<()> { trace!("Send Peer msg"); let peer_msg = PeerMsg { addr: endpoint.addr()?.clone(), @@ -391,14 +395,15 @@ impl LookupService { discovery_port: self.config.discovery_port, peer_id: self.id.clone(), }; - io_codec.write(NetMsgCmd::Peer, &peer_msg).await?; + conn.send(NetMsg::new(NetMsgCmd::Peer, &peer_msg)?).await?; Ok(()) } /// Sends a Shutdown msg. - async fn send_shutdown_msg(&self, io_codec: &Codec) -> Result<()> { + async fn send_shutdown_msg(&self, conn: &Conn<NetMsg>) -> Result<()> { trace!("Send Shutdown msg"); - io_codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await?; + conn.send(NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0))?) + .await?; Ok(()) } } diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 3e437aa..19ae77a 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -5,10 +5,10 @@ use std::sync::Arc; use log::{error, info}; use rand::{rngs::OsRng, seq::SliceRandom}; -use smol::lock::Mutex; use karyon_core::{ - async_util::{Backoff, Executor, TaskGroup, TaskResult}, + async_runtime::{lock::Mutex, Executor}, + async_util::{Backoff, TaskGroup, TaskResult}, crypto::KeyPair, }; @@ -19,6 +19,7 @@ use crate::{ connection::{ConnDirection, ConnQueue}, connector::Connector, listener::Listener, + message::NetMsg, monitor::Monitor, routing_table::{ Entry, EntryStatusFlag, RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, @@ -45,6 +46,7 @@ pub struct Discovery { /// Connector connector: Arc<Connector>, + /// Listener listener: Arc<Listener>, @@ -53,11 +55,12 @@ pub struct Discovery { /// Inbound slots. pub(crate) inbound_slots: Arc<ConnectionSlots>, + /// Outbound slots. pub(crate) outbound_slots: Arc<ConnectionSlots>, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -71,7 +74,7 @@ impl Discovery { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> ArcDiscovery { let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots)); let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots)); @@ -180,7 +183,7 @@ impl Discovery { /// Start a listener and on success, return the resolved endpoint. async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> { let selfc = self.clone(); - let callback = |c: Conn| async move { + let callback = |c: Conn<NetMsg>| async move { selfc.conn_queue.handle(c, ConnDirection::Inbound).await?; Ok(()) }; @@ -198,8 +201,8 @@ impl Discovery { async fn connect_loop(self: Arc<Self>) -> Result<()> { let backoff = Backoff::new(500, self.config.seeding_interval * 1000); loop { - let random_entry = self.random_entry(PENDING_ENTRY).await; - match random_entry { + let random_table_entry = self.random_table_entry(PENDING_ENTRY).await; + match random_table_entry { Some(entry) => { backoff.reset(); let endpoint = Endpoint::Tcp(entry.addr, entry.port); @@ -218,7 +221,7 @@ impl Discovery { let selfc = self.clone(); let pid_c = pid.clone(); let endpoint_c = endpoint.clone(); - let cback = |conn: Conn| async move { + let cback = |conn: Conn<NetMsg>| async move { let result = selfc.conn_queue.handle(conn, ConnDirection::Outbound).await; // If the entry is not in the routing table, ignore the result @@ -230,17 +233,17 @@ impl Discovery { match result { Err(Error::IncompatiblePeer) => { error!("Failed to do handshake: {endpoint_c} incompatible peer"); - selfc.update_entry(&pid, INCOMPATIBLE_ENTRY).await; + selfc.update_table_entry(&pid, INCOMPATIBLE_ENTRY).await; } Err(Error::PeerAlreadyConnected) => { - // TODO: Use the appropriate status. - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + // TODO: Use an appropriate status. + selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; } Err(_) => { - selfc.update_entry(&pid, UNSTABLE_ENTRY).await; + selfc.update_table_entry(&pid, UNSTABLE_ENTRY).await; } Ok(_) => { - selfc.update_entry(&pid, DISCONNECTED_ENTRY).await; + selfc.update_table_entry(&pid, DISCONNECTED_ENTRY).await; } } @@ -255,10 +258,10 @@ impl Discovery { if let Some(pid) = &pid { match result { Ok(_) => { - self.update_entry(pid, CONNECTED_ENTRY).await; + self.update_table_entry(pid, CONNECTED_ENTRY).await; } Err(_) => { - self.update_entry(pid, UNREACHABLE_ENTRY).await; + self.update_table_entry(pid, UNREACHABLE_ENTRY).await; } } } @@ -271,12 +274,16 @@ impl Discovery { /// table doesn't have an available entry, it will connect to one of the /// provided bootstrap endpoints in the `Config` and initiate the lookup. async fn start_seeding(&self) { - match self.random_entry(PENDING_ENTRY | CONNECTED_ENTRY).await { + match self + .random_table_entry(PENDING_ENTRY | CONNECTED_ENTRY) + .await + { Some(entry) => { let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port); let peer_id = Some(entry.key.into()); if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await { - self.update_entry(&entry.key.into(), UNSTABLE_ENTRY).await; + self.update_table_entry(&entry.key.into(), UNSTABLE_ENTRY) + .await; error!("Failed to do lookup: {endpoint}: {err}"); } } @@ -292,12 +299,12 @@ impl Discovery { } /// Returns a random entry from routing table. - async fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { + async fn random_table_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> { self.table.lock().await.random_entry(entry_flag).cloned() } /// Update the entry status - async fn update_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { + async fn update_table_entry(&self, pid: &PeerID, entry_flag: EntryStatusFlag) { let table = &mut self.table.lock().await; table.update_entry(&pid.0, entry_flag); } diff --git a/p2p/src/discovery/refresh.rs b/p2p/src/discovery/refresh.rs index 035a581..0c49ac2 100644 --- a/p2p/src/discovery/refresh.rs +++ b/p2p/src/discovery/refresh.rs @@ -3,31 +3,28 @@ use std::{sync::Arc, time::Duration}; use bincode::{Decode, Encode}; use log::{error, info, trace}; use rand::{rngs::OsRng, RngCore}; -use smol::{ - lock::{Mutex, RwLock}, - stream::StreamExt, - Timer, -}; use karyon_core::{ - async_util::{timeout, Backoff, Executor, TaskGroup, TaskResult}, - util::{decode, encode}, + async_runtime::{ + lock::{Mutex, RwLock}, + Executor, + }, + async_util::{sleep, timeout, Backoff, TaskGroup, TaskResult}, }; -use karyon_net::{udp, Connection, Endpoint, NetError}; - -/// Maximum failures for an entry before removing it from the routing table. -pub const MAX_FAILURES: u32 = 3; - -/// Ping message size -const PINGMSG_SIZE: usize = 32; +use karyon_net::{udp, Connection, Endpoint, Error as NetError}; use crate::{ + codec::RefreshMsgCodec, + message::RefreshMsg, monitor::{ConnEvent, DiscoveryEvent, Monitor}, routing_table::{BucketEntry, Entry, RoutingTable, PENDING_ENTRY, UNREACHABLE_ENTRY}, Config, Error, Result, }; +/// Maximum failures for an entry before removing it from the routing table. +pub const MAX_FAILURES: u32 = 3; + #[derive(Decode, Encode, Debug, Clone)] pub struct PingMsg(pub [u8; 32]); @@ -42,10 +39,10 @@ pub struct RefreshService { listen_endpoint: Option<RwLock<Endpoint>>, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// A global executor - executor: Executor<'static>, + executor: Executor, /// Holds the configuration for the P2P network. config: Arc<Config>, @@ -60,7 +57,7 @@ impl RefreshService { config: Arc<Config>, table: Arc<Mutex<RoutingTable>>, monitor: Arc<Monitor>, - executor: Executor<'static>, + executor: Executor, ) -> Self { let listen_endpoint = config .listen_endpoint @@ -118,9 +115,8 @@ impl RefreshService { /// selects the first 8 entries (oldest entries) from each bucket in the /// routing table and starts sending Ping messages to the collected entries. async fn refresh_loop(self: Arc<Self>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.config.refresh_interval)); loop { - timer.next().await; + sleep(Duration::from_secs(self.config.refresh_interval)).await; trace!("Start refreshing the routing table..."); self.monitor @@ -162,7 +158,7 @@ impl RefreshService { } for task in tasks { - task.await; + let _ = task.await; } } } @@ -193,10 +189,10 @@ impl RefreshService { async fn connect(&self, entry: &Entry) -> Result<()> { let mut retry = 0; let endpoint = Endpoint::Udp(entry.addr.clone(), entry.discovery_port); - let conn = udp::dial(&endpoint).await?; + let conn = udp::dial(&endpoint, Default::default(), RefreshMsgCodec {}).await?; let backoff = Backoff::new(100, 5000); while retry < self.config.refresh_connect_retries { - match self.send_ping_msg(&conn).await { + match self.send_ping_msg(&conn, &endpoint).await { Ok(()) => return Ok(()), Err(Error::KaryonNet(NetError::Timeout)) => { retry += 1; @@ -214,7 +210,7 @@ impl RefreshService { /// Set up a UDP listener and start listening for Ping messages from other /// peers. async fn listen_loop(self: Arc<Self>, endpoint: Endpoint) -> Result<()> { - let conn = match udp::listen(&endpoint).await { + let conn = match udp::listen(&endpoint, Default::default(), RefreshMsgCodec {}).await { Ok(c) => { self.monitor .notify(&ConnEvent::Listening(endpoint.clone()).into()) @@ -240,46 +236,48 @@ impl RefreshService { } /// Listen to receive a Ping message and respond with a Pong message. - async fn listen_to_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { - let mut buf = [0; PINGMSG_SIZE]; - let (_, endpoint) = conn.recv_from(&mut buf).await?; - + async fn listen_to_ping_msg(&self, conn: &udp::UdpConn<RefreshMsgCodec>) -> Result<()> { + let (msg, endpoint) = conn.recv().await?; self.monitor .notify(&ConnEvent::Accepted(endpoint.clone()).into()) .await; - let (ping_msg, _) = decode::<PingMsg>(&buf)?; - - let pong_msg = PongMsg(ping_msg.0); - let buffer = encode(&pong_msg)?; - - conn.send_to(&buffer, &endpoint).await?; + match msg { + RefreshMsg::Ping(m) => { + let pong_msg = RefreshMsg::Pong(m); + conn.send((pong_msg, endpoint.clone())).await?; + } + RefreshMsg::Pong(_) => return Err(Error::InvalidMsg("Unexpected pong msg".into())), + } self.monitor - .notify(&ConnEvent::Disconnected(endpoint.clone()).into()) + .notify(&ConnEvent::Disconnected(endpoint).into()) .await; Ok(()) } /// Sends a Ping msg and wait to receive the Pong message. - async fn send_ping_msg(&self, conn: &udp::UdpConn) -> Result<()> { + async fn send_ping_msg( + &self, + conn: &udp::UdpConn<RefreshMsgCodec>, + endpoint: &Endpoint, + ) -> Result<()> { let mut nonce: [u8; 32] = [0; 32]; RngCore::fill_bytes(&mut OsRng, &mut nonce); + conn.send((RefreshMsg::Ping(nonce), endpoint.clone())) + .await?; - let ping_msg = PingMsg(nonce); - let buffer = encode(&ping_msg)?; - conn.write(&buffer).await?; - - let buf = &mut [0; PINGMSG_SIZE]; let t = Duration::from_secs(self.config.refresh_response_timeout); - timeout(t, conn.read(buf)).await??; - - let (pong_msg, _) = decode::<PongMsg>(buf)?; + let (msg, _) = timeout(t, conn.recv()).await??; - if ping_msg.0 != pong_msg.0 { - return Err(Error::InvalidPongMsg); + match msg { + RefreshMsg::Pong(n) => { + if n != nonce { + return Err(Error::InvalidPongMsg); + } + Ok(()) + } + _ => Err(Error::InvalidMsg("Unexpected ping msg".into())), } - - Ok(()) } } diff --git a/p2p/src/error.rs b/p2p/src/error.rs index b4ddc2e..97b7b7f 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -62,27 +62,35 @@ pub enum Error { #[error("Rcgen Error: {0}")] Rcgen(#[from] rcgen::Error), + #[cfg(feature = "smol")] #[error("Tls Error: {0}")] Rustls(#[from] futures_rustls::rustls::Error), + #[cfg(feature = "tokio")] + #[error("Tls Error: {0}")] + Rustls(#[from] tokio_rustls::rustls::Error), + #[error("Invalid DNS Name: {0}")] - InvalidDnsNameError(#[from] futures_rustls::pki_types::InvalidDnsNameError), + InvalidDnsNameError(#[from] rustls_pki_types::InvalidDnsNameError), #[error("Channel Send Error: {0}")] ChannelSend(String), #[error(transparent)] - ChannelRecv(#[from] smol::channel::RecvError), + ChannelRecv(#[from] async_channel::RecvError), #[error(transparent)] KaryonCore(#[from] karyon_core::error::Error), #[error(transparent)] - KaryonNet(#[from] karyon_net::NetError), + KaryonNet(#[from] karyon_net::Error), + + #[error("Other Error: {0}")] + Other(String), } -impl<T> From<smol::channel::SendError<T>> for Error { - fn from(error: smol::channel::SendError<T>) -> Self { +impl<T> From<async_channel::SendError<T>> for Error { + fn from(error: async_channel::SendError<T>) -> Self { Error::ChannelSend(error.to_string()) } } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 3605359..8f3cf45 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -5,7 +5,7 @@ //! use std::sync::Arc; //! //! use easy_parallel::Parallel; -//! use smol::{channel as smol_channel, future, Executor}; +//! use smol::{future, Executor}; //! //! use karyon_core::crypto::{KeyPair, KeyPairType}; //! use karyon_p2p::{Backend, Config, PeerID}; @@ -19,7 +19,7 @@ //! let ex = Arc::new(Executor::new()); //! //! // Create a new Backend -//! let backend = Backend::new(&key_pair, config, ex.clone()); +//! let backend = Backend::new(&key_pair, config, ex.clone().into()); //! //! let task = async { //! // Run the backend diff --git a/p2p/src/listener.rs b/p2p/src/listener.rs index 4a41482..1abf79a 100644 --- a/p2p/src/listener.rs +++ b/p2p/src/listener.rs @@ -3,13 +3,16 @@ use std::{future::Future, sync::Arc}; use log::{debug, error, info}; use karyon_core::{ - async_util::{Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{TaskGroup, TaskResult}, crypto::KeyPair, }; -use karyon_net::{tcp, tls, Conn, ConnListener, Endpoint}; +use karyon_net::{tcp, tls, Conn, Endpoint}; use crate::{ + codec::NetMsgCodec, + message::NetMsg, monitor::{ConnEvent, Monitor}, slots::ConnectionSlots, tls_config::tls_server_config, @@ -22,7 +25,7 @@ pub struct Listener { key_pair: KeyPair, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// Manages available inbound slots. connection_slots: Arc<ConnectionSlots>, @@ -41,7 +44,7 @@ impl Listener { connection_slots: Arc<ConnectionSlots>, enable_tls: bool, monitor: Arc<Monitor>, - ex: Executor<'static>, + ex: Executor, ) -> Arc<Self> { Arc::new(Self { key_pair: key_pair.clone(), @@ -61,7 +64,7 @@ impl Listener { self: &Arc<Self>, endpoint: Endpoint, // https://github.com/rust-lang/rfcs/pull/2132 - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, + callback: impl FnOnce(Conn<NetMsg>) -> Fut + Clone + Send + 'static, ) -> Result<Endpoint> where Fut: Future<Output = Result<()>> + Send + 'static, @@ -82,7 +85,7 @@ impl Listener { } }; - let resolved_endpoint = listener.local_endpoint()?; + let resolved_endpoint = listener.local_endpoint().map_err(Error::from)?; info!("Start listening on {resolved_endpoint}"); @@ -99,8 +102,8 @@ impl Listener { async fn listen_loop<Fut>( self: Arc<Self>, - listener: Box<dyn ConnListener>, - callback: impl FnOnce(Conn) -> Fut + Clone + Send + 'static, + listener: karyon_net::Listener<NetMsg>, + callback: impl FnOnce(Conn<NetMsg>) -> Fut + Clone + Send + 'static, ) where Fut: Future<Output = Result<()>> + Send + 'static, { @@ -112,7 +115,7 @@ impl Listener { let (conn, endpoint) = match result { Ok(c) => { let endpoint = match c.peer_endpoint() { - Ok(e) => e, + Ok(ep) => ep, Err(err) => { self.monitor.notify(&ConnEvent::AcceptFailed.into()).await; error!("Failed to accept a new connection: {err}"); @@ -151,16 +154,19 @@ impl Listener { } } - async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener> { + async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> { if self.enable_tls { - let tls_config = tls_server_config(&self.key_pair)?; - tls::listen(endpoint, tls_config) + let tls_config = tls::ServerTlsConfig { + tcp_config: Default::default(), + server_config: tls_server_config(&self.key_pair)?, + }; + tls::listen(endpoint, tls_config, NetMsgCodec::new()) .await - .map(|l| Box::new(l) as karyon_net::Listener) + .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) } else { - tcp::listen(endpoint) + tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new()) .await - .map(|l| Box::new(l) as karyon_net::Listener) + .map(|l| Box::new(l) as karyon_net::Listener<NetMsg>) } .map_err(Error::KaryonNet) } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 1342110..6498ef7 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -2,15 +2,10 @@ use std::collections::HashMap; use bincode::{Decode, Encode}; +use karyon_core::util::encode; use karyon_net::{Addr, Port}; -use crate::{protocol::ProtocolID, routing_table::Entry, version::VersionInt, PeerID}; - -/// The size of the message header, in bytes. -pub const MSG_HEADER_SIZE: usize = 6; - -/// The maximum allowed size for a message in bytes. -pub const MAX_ALLOWED_MSG_SIZE: u32 = 1024 * 1024; // 1MB +use crate::{protocol::ProtocolID, routing_table::Entry, version::VersionInt, PeerID, Result}; /// Defines the main message in the karyon p2p network. /// @@ -23,11 +18,19 @@ pub struct NetMsg { pub payload: Vec<u8>, } +impl NetMsg { + pub fn new<T: Encode>(command: NetMsgCmd, t: T) -> Result<Self> { + Ok(Self { + header: NetMsgHeader { command }, + payload: encode(&t)?, + }) + } +} + /// Represents the header of a message. #[derive(Decode, Encode, Debug, Clone)] pub struct NetMsgHeader { pub command: NetMsgCmd, - pub payload_size: u32, } /// Defines message commands. @@ -39,7 +42,7 @@ pub enum NetMsgCmd { Protocol, Shutdown, - // NOTE: The following commands are used during the lookup process. + // The following commands are used during the lookup process. Ping, Pong, FindPeer, @@ -47,6 +50,12 @@ pub enum NetMsgCmd { Peers, } +#[derive(Decode, Encode, Debug, Clone)] +pub enum RefreshMsg { + Ping([u8; 32]), + Pong([u8; 32]), +} + /// Defines a message related to a specific protocol. #[derive(Decode, Encode, Debug, Clone)] pub struct ProtocolMsg { @@ -103,21 +112,6 @@ pub struct PeerMsg { #[derive(Decode, Encode, Debug)] pub struct PeersMsg(pub Vec<PeerMsg>); -macro_rules! get_msg_payload { - ($a:ident, $b:ident) => { - if let NetMsgCmd::$a = $b.header.command { - $b.payload - } else { - return Err(Error::InvalidMsg(format!( - "Unexpected msg {:?}", - $b.header.command - ))); - } - }; -} - -pub(super) use get_msg_payload; - impl From<Entry> for PeerMsg { fn from(entry: Entry) -> PeerMsg { PeerMsg { @@ -139,3 +133,19 @@ impl From<PeerMsg> for Entry { } } } + +macro_rules! get_msg_payload { + ($a:ident, $b:ident) => { + if let NetMsgCmd::$a = $b.header.command { + $b.payload + } else { + return Err(Error::InvalidMsg(format!( + "Expected {:?} msg found {:?} msg", + stringify!($a), + $b.header.command + ))); + } + }; +} + +pub(super) use get_msg_payload; diff --git a/p2p/src/monitor.rs b/p2p/src/monitor.rs index b0ce028..48719c0 100644 --- a/p2p/src/monitor.rs +++ b/p2p/src/monitor.rs @@ -26,7 +26,7 @@ use karyon_net::Endpoint; /// let ex = Arc::new(Executor::new()); /// /// let key_pair = KeyPair::generate(&KeyPairType::Ed25519); -/// let backend = Backend::new(&key_pair, Config::default(), ex); +/// let backend = Backend::new(&key_pair, Config::default(), ex.into()); /// /// // Create a new Subscription /// let sub = backend.monitor().await; diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index ca68530..f0f6f17 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -4,24 +4,22 @@ pub use peer_id::PeerID; use std::sync::Arc; +use async_channel::{Receiver, Sender}; +use bincode::{Decode, Encode}; use log::{error, trace}; -use smol::{ - channel::{self, Receiver, Sender}, - lock::RwLock, -}; use karyon_core::{ - async_util::{select, Either, Executor, TaskGroup, TaskResult}, + async_runtime::{lock::RwLock, Executor}, + async_util::{select, Either, TaskGroup, TaskResult}, event::{ArcEventSys, EventListener, EventSys}, util::{decode, encode}, }; -use karyon_net::Endpoint; +use karyon_net::{Conn, Endpoint}; use crate::{ - codec::{Codec, CodecMsg}, connection::ConnDirection, - message::{NetMsgCmd, ProtocolMsg, ShutdownMsg}, + message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg}, peer_pool::{ArcPeerPool, WeakPeerPool}, protocol::{Protocol, ProtocolEvent, ProtocolID}, Config, Error, Result, @@ -36,8 +34,8 @@ pub struct Peer { /// A weak pointer to `PeerPool` peer_pool: WeakPeerPool, - /// Holds the Codec for the peer connection - codec: Codec, + /// Holds the peer connection + conn: Conn<NetMsg>, /// Remote endpoint for the peer remote_endpoint: Endpoint, @@ -55,7 +53,7 @@ pub struct Peer { stop_chan: (Sender<Result<()>>, Receiver<Result<()>>), /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, } impl Peer { @@ -63,21 +61,21 @@ impl Peer { pub fn new( peer_pool: WeakPeerPool, id: &PeerID, - codec: Codec, + conn: Conn<NetMsg>, remote_endpoint: Endpoint, conn_direction: ConnDirection, - ex: Executor<'static>, + ex: Executor, ) -> ArcPeer { Arc::new(Peer { id: id.clone(), peer_pool, - codec, + conn, protocol_ids: RwLock::new(Vec::new()), remote_endpoint, conn_direction, protocol_events: EventSys::new(), task_group: TaskGroup::with_executor(ex), - stop_chan: channel::bounded(1), + stop_chan: async_channel::bounded(1), }) } @@ -88,7 +86,7 @@ impl Peer { } /// Send a message to the peer connection using the specified protocol. - pub async fn send<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { + pub async fn send<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) -> Result<()> { let payload = encode(msg)?; let proto_msg = ProtocolMsg { @@ -96,12 +94,14 @@ impl Peer { payload: payload.to_vec(), }; - self.codec.write(NetMsgCmd::Protocol, &proto_msg).await?; + self.conn + .send(NetMsg::new(NetMsgCmd::Protocol, &proto_msg)?) + .await?; Ok(()) } /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast<T: CodecMsg>(&self, protocol_id: &ProtocolID, msg: &T) { + pub async fn broadcast<T: Encode + Decode>(&self, protocol_id: &ProtocolID, msg: &T) { self.peer_pool().broadcast(protocol_id, msg).await; } @@ -123,7 +123,9 @@ impl Peer { let _ = self.stop_chan.0.try_send(Ok(())); // No need to handle the error here - let _ = self.codec.write(NetMsgCmd::Shutdown, &ShutdownMsg(0)).await; + let shutdown_msg = + NetMsg::new(NetMsgCmd::Shutdown, &ShutdownMsg(0)).expect("pack shutdown message"); + let _ = self.conn.send(shutdown_msg).await; // Force shutting down self.task_group.cancel().await; @@ -170,7 +172,7 @@ impl Peer { /// Start a read loop to handle incoming messages from the peer connection. async fn read_loop(&self) -> Result<()> { loop { - let fut = select(self.stop_chan.1.recv(), self.codec.read()).await; + let fut = select(self.stop_chan.1.recv(), self.conn.recv()).await; let result = match fut { Either::Left(stop_signal) => { trace!("Peer {} received a stop signal", self.id); diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index 4e20c99..8b16ef5 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -4,21 +4,22 @@ use std::{ time::Duration, }; +use async_channel::Sender; +use bincode::{Decode, Encode}; use log::{error, info, trace, warn}; -use smol::{ - channel::Sender, - lock::{Mutex, RwLock}, -}; use karyon_core::{ - async_util::{Executor, TaskGroup, TaskResult}, + async_runtime::{ + lock::{Mutex, RwLock}, + Executor, + }, + async_util::{timeout, TaskGroup, TaskResult}, util::decode, }; use karyon_net::Conn; use crate::{ - codec::{Codec, CodecMsg}, config::Config, connection::{ConnDirection, ConnQueue}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, @@ -50,10 +51,10 @@ pub struct PeerPool { protocol_versions: Arc<RwLock<HashMap<ProtocolID, Version>>>, /// Managing spawned tasks. - task_group: TaskGroup<'static>, + task_group: TaskGroup, /// A global Executor - executor: Executor<'static>, + executor: Executor, /// The Configuration for the P2P network. pub(crate) config: Arc<Config>, @@ -69,7 +70,7 @@ impl PeerPool { conn_queue: Arc<ConnQueue>, config: Arc<Config>, monitor: Arc<Monitor>, - executor: Executor<'static>, + executor: Executor, ) -> Arc<Self> { let protocols = RwLock::new(HashMap::new()); let protocol_versions = Arc::new(RwLock::new(HashMap::new())); @@ -137,7 +138,7 @@ impl PeerPool { } /// Broadcast a message to all connected peers using the specified protocol. - pub async fn broadcast<T: CodecMsg>(&self, proto_id: &ProtocolID, msg: &T) { + pub async fn broadcast<T: Decode + Encode>(&self, proto_id: &ProtocolID, msg: &T) { for (pid, peer) in self.peers.lock().await.iter() { if let Err(err) = peer.send(proto_id, msg).await { error!("failed to send msg to {pid}: {err}"); @@ -149,15 +150,14 @@ impl PeerPool { /// Add a new peer to the peer list. pub async fn new_peer( self: &Arc<Self>, - conn: Conn, + conn: Conn<NetMsg>, conn_direction: &ConnDirection, disconnect_signal: Sender<Result<()>>, ) -> Result<()> { let endpoint = conn.peer_endpoint()?; - let codec = Codec::new(conn); // Do a handshake with the connection before creating a new peer. - let pid = self.do_handshake(&codec, conn_direction).await?; + let pid = self.do_handshake(&conn, conn_direction).await?; // TODO: Consider restricting the subnet for inbound connections if self.contains_peer(&pid).await { @@ -168,7 +168,7 @@ impl PeerPool { let peer = Peer::new( Arc::downgrade(self), &pid, - codec, + conn, endpoint.clone(), conn_direction.clone(), self.executor.clone(), @@ -234,16 +234,21 @@ impl PeerPool { } /// Initiate a handshake with a connection. - async fn do_handshake(&self, codec: &Codec, conn_direction: &ConnDirection) -> Result<PeerID> { + async fn do_handshake( + &self, + conn: &Conn<NetMsg>, + conn_direction: &ConnDirection, + ) -> Result<PeerID> { + trace!("Handshake started: {}", conn.peer_endpoint()?); match conn_direction { ConnDirection::Inbound => { - let result = self.wait_vermsg(codec).await; + let result = self.wait_vermsg(conn).await; match result { Ok(_) => { - self.send_verack(codec, true).await?; + self.send_verack(conn, true).await?; } Err(Error::IncompatibleVersion(_)) | Err(Error::UnsupportedProtocol(_)) => { - self.send_verack(codec, false).await?; + self.send_verack(conn, false).await?; } _ => {} } @@ -251,14 +256,14 @@ impl PeerPool { } ConnDirection::Outbound => { - self.send_vermsg(codec).await?; - self.wait_verack(codec).await + self.send_vermsg(conn).await?; + self.wait_verack(conn).await } } } /// Send a Version message - async fn send_vermsg(&self, codec: &Codec) -> Result<()> { + async fn send_vermsg(&self, conn: &Conn<NetMsg>) -> Result<()> { let pids = self.protocol_versions.read().await; let protocols = pids.iter().map(|p| (p.0.clone(), p.1.v.clone())).collect(); drop(pids); @@ -270,16 +275,16 @@ impl PeerPool { }; trace!("Send VerMsg"); - codec.write(NetMsgCmd::Version, &vermsg).await?; + conn.send(NetMsg::new(NetMsgCmd::Version, &vermsg)?).await?; Ok(()) } /// Wait for a Version message /// /// Returns the peer's ID upon successfully receiving the Version message. - async fn wait_vermsg(&self, codec: &Codec) -> Result<PeerID> { - let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = codec.read_timeout(timeout).await?; + async fn wait_vermsg(&self, conn: &Conn<NetMsg>) -> Result<PeerID> { + let t = Duration::from_secs(self.config.handshake_timeout); + let msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Version, msg); let (vermsg, _) = decode::<VerMsg>(&payload)?; @@ -295,23 +300,23 @@ impl PeerPool { } /// Send a Verack message - async fn send_verack(&self, codec: &Codec, ack: bool) -> Result<()> { + async fn send_verack(&self, conn: &Conn<NetMsg>, ack: bool) -> Result<()> { let verack = VerAckMsg { peer_id: self.id.clone(), ack, }; trace!("Send VerAckMsg {:?}", verack); - codec.write(NetMsgCmd::Verack, &verack).await?; + conn.send(NetMsg::new(NetMsgCmd::Verack, &verack)?).await?; Ok(()) } /// Wait for a Verack message /// /// Returns the peer's ID upon successfully receiving the Verack message. - async fn wait_verack(&self, codec: &Codec) -> Result<PeerID> { - let timeout = Duration::from_secs(self.config.handshake_timeout); - let msg: NetMsg = codec.read_timeout(timeout).await?; + async fn wait_verack(&self, conn: &Conn<NetMsg>) -> Result<PeerID> { + let t = Duration::from_secs(self.config.handshake_timeout); + let msg: NetMsg = timeout(t, conn.recv()).await??; let payload = get_msg_payload!(Verack, msg); let (verack, _) = decode::<VerAckMsg>(&payload)?; diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index f28659c..6153ea1 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -92,7 +92,7 @@ impl EventValue for ProtocolEvent { /// let ex = Arc::new(Executor::new()); /// /// // Create a new Backend -/// let backend = Backend::new(&key_pair, config, ex); +/// let backend = Backend::new(&key_pair, config, ex.into()); /// /// // Attach the NewProtocol /// let c = move |peer| NewProtocol::new(peer); diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index f04e059..654644a 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -1,23 +1,19 @@ use std::{sync::Arc, time::Duration}; +use async_channel::{Receiver, Sender}; use async_trait::async_trait; use bincode::{Decode, Encode}; use log::trace; use rand::{rngs::OsRng, RngCore}; -use smol::{ - channel, - channel::{Receiver, Sender}, - stream::StreamExt, - Timer, -}; use karyon_core::{ - async_util::{select, timeout, Either, Executor, TaskGroup, TaskResult}, + async_runtime::Executor, + async_util::{select, sleep, timeout, Either, TaskGroup, TaskResult}, event::EventListener, util::decode, }; -use karyon_net::NetError; +use karyon_net::Error as NetError; use crate::{ peer::ArcPeer, @@ -38,12 +34,12 @@ pub struct PingProtocol { peer: ArcPeer, ping_interval: u64, ping_timeout: u64, - task_group: TaskGroup<'static>, + task_group: TaskGroup, } impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer, executor: Executor<'static>) -> ArcProtocol { + pub fn new(peer: ArcPeer, executor: Executor) -> ArcProtocol { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { @@ -87,12 +83,11 @@ impl PingProtocol { } async fn ping_loop(self: Arc<Self>, chan: Receiver<[u8; 32]>) -> Result<()> { - let mut timer = Timer::interval(Duration::from_secs(self.ping_interval)); let rng = &mut OsRng; let mut retry = 0; while retry < MAX_FAILUERS { - timer.next().await; + sleep(Duration::from_secs(self.ping_interval)).await; let mut ping_nonce: [u8; 32] = [0; 32]; rng.fill_bytes(&mut ping_nonce); @@ -130,8 +125,8 @@ impl Protocol for PingProtocol { async fn start(self: Arc<Self>) -> Result<()> { trace!("Start Ping protocol"); - let (pong_chan, pong_chan_recv) = channel::bounded(1); - let (stop_signal_s, stop_signal) = channel::bounded::<Result<()>>(1); + let (pong_chan, pong_chan_recv) = async_channel::bounded(1); + let (stop_signal_s, stop_signal) = async_channel::bounded::<Result<()>>(1); let selfc = self.clone(); self.task_group.spawn( diff --git a/p2p/src/routing_table/entry.rs b/p2p/src/routing_table/entry.rs index 3fc8a6b..1427c2b 100644 --- a/p2p/src/routing_table/entry.rs +++ b/p2p/src/routing_table/entry.rs @@ -5,6 +5,9 @@ use karyon_net::{Addr, Port}; /// Specifies the size of the key, in bytes. pub const KEY_SIZE: usize = 32; +/// The unique key identifying the peer. +pub type Key = [u8; KEY_SIZE]; + /// An Entry represents a peer in the routing table. #[derive(Encode, Decode, Clone, Debug)] pub struct Entry { @@ -20,14 +23,11 @@ pub struct Entry { impl PartialEq for Entry { fn eq(&self, other: &Self) -> bool { - // TODO: this should also compare both addresses (the self.addr == other.addr) + // XXX: should we compare both self.addr and other.addr??? self.key == other.key } } -/// The unique key identifying the peer. -pub type Key = [u8; KEY_SIZE]; - /// Calculates the XOR distance between two provided keys. /// /// The XOR distance is a metric used in Kademlia to measure the closeness diff --git a/p2p/src/routing_table/mod.rs b/p2p/src/routing_table/mod.rs index 6854546..bbf4801 100644 --- a/p2p/src/routing_table/mod.rs +++ b/p2p/src/routing_table/mod.rs @@ -266,7 +266,7 @@ impl RoutingTable { } /// Check if two addresses belong to the same subnet. -pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { +fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { match (addr, other_addr) { (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => { // TODO: Consider moving this to a different place @@ -275,6 +275,8 @@ pub fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool { } ip.octets()[0..3] == other_ip.octets()[0..3] } + + // TODO: check ipv6 _ => false, } } diff --git a/p2p/src/tls_config.rs b/p2p/src/tls_config.rs index 893c321..65d2adc 100644 --- a/p2p/src/tls_config.rs +++ b/p2p/src/tls_config.rs @@ -1,19 +1,25 @@ use std::sync::Arc; -use futures_rustls::rustls::{ - self, +#[cfg(feature = "smol")] +use futures_rustls::rustls; + +#[cfg(feature = "tokio")] +use tokio_rustls::rustls; + +use rustls::{ client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, crypto::{ aws_lc_rs::{self, cipher_suite::TLS13_CHACHA20_POLY1305_SHA256, kx_group}, CryptoProvider, SupportedKxGroup, }, - pki_types::{CertificateDer, PrivateKeyDer, ServerName, UnixTime}, server::danger::{ClientCertVerified, ClientCertVerifier}, CertificateError, DigitallySignedStruct, DistinguishedName, Error::InvalidCertificate, SignatureScheme, SupportedCipherSuite, SupportedProtocolVersion, }; +use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName, UnixTime}; + use log::error; use x509_parser::{certificate::X509Certificate, parse_x509_certificate}; |