From 0992071a7f1a36424bcfaf1fbc84541ea041df1a Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 11 Apr 2024 10:19:20 +0200 Subject: add support for tokio & improve net crate api --- core/src/async_runtime/executor.rs | 100 +++++++++++++++++++++++++++++++++++++ core/src/async_runtime/io.rs | 9 ++++ core/src/async_runtime/lock.rs | 5 ++ core/src/async_runtime/mod.rs | 25 ++++++++++ core/src/async_runtime/net.rs | 12 +++++ core/src/async_runtime/spawn.rs | 12 +++++ core/src/async_runtime/task.rs | 52 +++++++++++++++++++ core/src/async_runtime/timer.rs | 1 + 8 files changed, 216 insertions(+) create mode 100644 core/src/async_runtime/executor.rs create mode 100644 core/src/async_runtime/io.rs create mode 100644 core/src/async_runtime/lock.rs create mode 100644 core/src/async_runtime/mod.rs create mode 100644 core/src/async_runtime/net.rs create mode 100644 core/src/async_runtime/spawn.rs create mode 100644 core/src/async_runtime/task.rs create mode 100644 core/src/async_runtime/timer.rs (limited to 'core/src/async_runtime') diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs new file mode 100644 index 0000000..9335f12 --- /dev/null +++ b/core/src/async_runtime/executor.rs @@ -0,0 +1,100 @@ +use std::{future::Future, panic::catch_unwind, sync::Arc, thread}; + +use once_cell::sync::OnceCell; + +#[cfg(feature = "smol")] +pub use smol::Executor as SmolEx; + +#[cfg(feature = "tokio")] +pub use tokio::runtime::Runtime; + +use super::Task; + +#[derive(Clone)] +pub struct Executor { + #[cfg(feature = "smol")] + inner: Arc>, + #[cfg(feature = "tokio")] + inner: Arc, +} + +impl Executor { + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + self.inner.spawn(future).into() + } +} + +static GLOBAL_EXECUTOR: OnceCell = OnceCell::new(); + +/// Returns a single-threaded global executor +pub fn global_executor() -> Executor { + #[cfg(feature = "smol")] + fn init_executor() -> Executor { + let ex = smol::Executor::new(); + thread::Builder::new() + .name("smol-executor".to_string()) + .spawn(|| loop { + catch_unwind(|| { + smol::block_on(global_executor().inner.run(std::future::pending::<()>())) + }) + .ok(); + }) + .expect("cannot spawn executor thread"); + // Prevent spawning another thread by running the process driver on this + // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs + ex.spawn(async_process::driver()).detach(); + Executor { + inner: Arc::new(ex), + } + } + + #[cfg(feature = "tokio")] + fn init_executor() -> Executor { + let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime")); + let ex_cloned = ex.clone(); + thread::Builder::new() + .name("tokio-executor".to_string()) + .spawn(move || { + catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok(); + }) + .expect("cannot spawn tokio runtime thread"); + Executor { inner: ex } + } + + GLOBAL_EXECUTOR.get_or_init(init_executor).clone() +} + +#[cfg(feature = "smol")] +impl From>> for Executor { + fn from(ex: Arc>) -> Executor { + Executor { inner: ex } + } +} + +#[cfg(feature = "tokio")] +impl From> for Executor { + fn from(rt: Arc) -> Executor { + Executor { inner: rt } + } +} + +#[cfg(feature = "smol")] +impl From> for Executor { + fn from(ex: smol::Executor<'static>) -> Executor { + Executor { + inner: Arc::new(ex), + } + } +} + +#[cfg(feature = "tokio")] +impl From for Executor { + fn from(rt: tokio::runtime::Runtime) -> Executor { + Executor { + inner: Arc::new(rt), + } + } +} diff --git a/core/src/async_runtime/io.rs b/core/src/async_runtime/io.rs new file mode 100644 index 0000000..161c258 --- /dev/null +++ b/core/src/async_runtime/io.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "smol")] +pub use smol::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; + +#[cfg(feature = "tokio")] +pub use tokio::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; diff --git a/core/src/async_runtime/lock.rs b/core/src/async_runtime/lock.rs new file mode 100644 index 0000000..fc84d1d --- /dev/null +++ b/core/src/async_runtime/lock.rs @@ -0,0 +1,5 @@ +#[cfg(feature = "smol")] +pub use smol::lock::{Mutex, MutexGuard, OnceCell, RwLock}; + +#[cfg(feature = "tokio")] +pub use tokio::sync::{Mutex, MutexGuard, OnceCell, RwLock}; diff --git a/core/src/async_runtime/mod.rs b/core/src/async_runtime/mod.rs new file mode 100644 index 0000000..d91d01b --- /dev/null +++ b/core/src/async_runtime/mod.rs @@ -0,0 +1,25 @@ +mod executor; +pub mod io; +pub mod lock; +pub mod net; +mod spawn; +mod task; +mod timer; + +pub use executor::{global_executor, Executor}; +pub use spawn::spawn; +pub use task::Task; + +#[cfg(test)] +pub fn block_on(future: impl std::future::Future) -> T { + #[cfg(feature = "smol")] + let result = smol::block_on(future); + #[cfg(feature = "tokio")] + let result = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future); + + result +} diff --git a/core/src/async_runtime/net.rs b/core/src/async_runtime/net.rs new file mode 100644 index 0000000..5c004ce --- /dev/null +++ b/core/src/async_runtime/net.rs @@ -0,0 +1,12 @@ +pub use std::os::unix::net::SocketAddr; + +#[cfg(feature = "smol")] +pub use smol::net::{ + unix::{SocketAddr as UnixSocketAddr, UnixListener, UnixStream}, + TcpListener, TcpStream, UdpSocket, +}; + +#[cfg(feature = "tokio")] +pub use tokio::net::{ + unix::SocketAddr as UnixSocketAddr, TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream, +}; diff --git a/core/src/async_runtime/spawn.rs b/core/src/async_runtime/spawn.rs new file mode 100644 index 0000000..2760982 --- /dev/null +++ b/core/src/async_runtime/spawn.rs @@ -0,0 +1,12 @@ +use std::future::Future; + +use super::Task; + +pub fn spawn(future: impl Future + Send + 'static) -> Task { + #[cfg(feature = "smol")] + let result: Task = smol::spawn(future).into(); + #[cfg(feature = "tokio")] + let result: Task = tokio::spawn(future).into(); + + result +} diff --git a/core/src/async_runtime/task.rs b/core/src/async_runtime/task.rs new file mode 100644 index 0000000..a681b0f --- /dev/null +++ b/core/src/async_runtime/task.rs @@ -0,0 +1,52 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::error::Error; + +pub struct Task { + #[cfg(feature = "smol")] + inner_task: smol::Task, + #[cfg(feature = "tokio")] + inner_task: tokio::task::JoinHandle, +} + +impl Task { + pub async fn cancel(self) { + #[cfg(feature = "smol")] + self.inner_task.cancel().await; + #[cfg(feature = "tokio")] + self.inner_task.abort(); + } +} + +impl Future for Task { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + #[cfg(feature = "smol")] + let result = smol::Task::poll(Pin::new(&mut self.inner_task), cx); + #[cfg(feature = "tokio")] + let result = tokio::task::JoinHandle::poll(Pin::new(&mut self.inner_task), cx); + + #[cfg(feature = "smol")] + return result.map(Ok); + + #[cfg(feature = "tokio")] + return result.map_err(|e| e.into()); + } +} + +#[cfg(feature = "smol")] +impl From> for Task { + fn from(t: smol::Task) -> Task { + Task { inner_task: t } + } +} + +#[cfg(feature = "tokio")] +impl From> for Task { + fn from(t: tokio::task::JoinHandle) -> Task { + Task { inner_task: t } + } +} diff --git a/core/src/async_runtime/timer.rs b/core/src/async_runtime/timer.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/core/src/async_runtime/timer.rs @@ -0,0 +1 @@ + -- cgit v1.2.3