diff options
author | hozan23 <hozan23@karyontech.net> | 2024-04-11 10:19:20 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-05-19 13:51:30 +0200 |
commit | 0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch) | |
tree | 961d73218af672797d49f899289bef295bc56493 /core/src/async_runtime | |
parent | a69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff) |
add support for tokio & improve net crate api
Diffstat (limited to 'core/src/async_runtime')
-rw-r--r-- | core/src/async_runtime/executor.rs | 100 | ||||
-rw-r--r-- | core/src/async_runtime/io.rs | 9 | ||||
-rw-r--r-- | core/src/async_runtime/lock.rs | 5 | ||||
-rw-r--r-- | core/src/async_runtime/mod.rs | 25 | ||||
-rw-r--r-- | core/src/async_runtime/net.rs | 12 | ||||
-rw-r--r-- | core/src/async_runtime/spawn.rs | 12 | ||||
-rw-r--r-- | core/src/async_runtime/task.rs | 52 | ||||
-rw-r--r-- | core/src/async_runtime/timer.rs | 1 |
8 files changed, 216 insertions, 0 deletions
diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs new file mode 100644 index 0000000..9335f12 --- /dev/null +++ b/core/src/async_runtime/executor.rs @@ -0,0 +1,100 @@ +use std::{future::Future, panic::catch_unwind, sync::Arc, thread}; + +use once_cell::sync::OnceCell; + +#[cfg(feature = "smol")] +pub use smol::Executor as SmolEx; + +#[cfg(feature = "tokio")] +pub use tokio::runtime::Runtime; + +use super::Task; + +#[derive(Clone)] +pub struct Executor { + #[cfg(feature = "smol")] + inner: Arc<SmolEx<'static>>, + #[cfg(feature = "tokio")] + inner: Arc<Runtime>, +} + +impl Executor { + pub fn spawn<T: Send + 'static>( + &self, + future: impl Future<Output = T> + Send + 'static, + ) -> Task<T> { + self.inner.spawn(future).into() + } +} + +static GLOBAL_EXECUTOR: OnceCell<Executor> = OnceCell::new(); + +/// Returns a single-threaded global executor +pub fn global_executor() -> Executor { + #[cfg(feature = "smol")] + fn init_executor() -> Executor { + let ex = smol::Executor::new(); + thread::Builder::new() + .name("smol-executor".to_string()) + .spawn(|| loop { + catch_unwind(|| { + smol::block_on(global_executor().inner.run(std::future::pending::<()>())) + }) + .ok(); + }) + .expect("cannot spawn executor thread"); + // Prevent spawning another thread by running the process driver on this + // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs + ex.spawn(async_process::driver()).detach(); + Executor { + inner: Arc::new(ex), + } + } + + #[cfg(feature = "tokio")] + fn init_executor() -> Executor { + let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime")); + let ex_cloned = ex.clone(); + thread::Builder::new() + .name("tokio-executor".to_string()) + .spawn(move || { + catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok(); + }) + .expect("cannot spawn tokio runtime thread"); + Executor { inner: ex } + } + + GLOBAL_EXECUTOR.get_or_init(init_executor).clone() +} + +#[cfg(feature = "smol")] +impl From<Arc<smol::Executor<'static>>> for Executor { + fn from(ex: Arc<smol::Executor<'static>>) -> Executor { + Executor { inner: ex } + } +} + +#[cfg(feature = "tokio")] +impl From<Arc<tokio::runtime::Runtime>> for Executor { + fn from(rt: Arc<tokio::runtime::Runtime>) -> Executor { + Executor { inner: rt } + } +} + +#[cfg(feature = "smol")] +impl From<smol::Executor<'static>> for Executor { + fn from(ex: smol::Executor<'static>) -> Executor { + Executor { + inner: Arc::new(ex), + } + } +} + +#[cfg(feature = "tokio")] +impl From<tokio::runtime::Runtime> for Executor { + fn from(rt: tokio::runtime::Runtime) -> Executor { + Executor { + inner: Arc::new(rt), + } + } +} diff --git a/core/src/async_runtime/io.rs b/core/src/async_runtime/io.rs new file mode 100644 index 0000000..161c258 --- /dev/null +++ b/core/src/async_runtime/io.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "smol")] +pub use smol::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; + +#[cfg(feature = "tokio")] +pub use tokio::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, +}; diff --git a/core/src/async_runtime/lock.rs b/core/src/async_runtime/lock.rs new file mode 100644 index 0000000..fc84d1d --- /dev/null +++ b/core/src/async_runtime/lock.rs @@ -0,0 +1,5 @@ +#[cfg(feature = "smol")] +pub use smol::lock::{Mutex, MutexGuard, OnceCell, RwLock}; + +#[cfg(feature = "tokio")] +pub use tokio::sync::{Mutex, MutexGuard, OnceCell, RwLock}; diff --git a/core/src/async_runtime/mod.rs b/core/src/async_runtime/mod.rs new file mode 100644 index 0000000..d91d01b --- /dev/null +++ b/core/src/async_runtime/mod.rs @@ -0,0 +1,25 @@ +mod executor; +pub mod io; +pub mod lock; +pub mod net; +mod spawn; +mod task; +mod timer; + +pub use executor::{global_executor, Executor}; +pub use spawn::spawn; +pub use task::Task; + +#[cfg(test)] +pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T { + #[cfg(feature = "smol")] + let result = smol::block_on(future); + #[cfg(feature = "tokio")] + let result = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future); + + result +} diff --git a/core/src/async_runtime/net.rs b/core/src/async_runtime/net.rs new file mode 100644 index 0000000..5c004ce --- /dev/null +++ b/core/src/async_runtime/net.rs @@ -0,0 +1,12 @@ +pub use std::os::unix::net::SocketAddr; + +#[cfg(feature = "smol")] +pub use smol::net::{ + unix::{SocketAddr as UnixSocketAddr, UnixListener, UnixStream}, + TcpListener, TcpStream, UdpSocket, +}; + +#[cfg(feature = "tokio")] +pub use tokio::net::{ + unix::SocketAddr as UnixSocketAddr, TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream, +}; diff --git a/core/src/async_runtime/spawn.rs b/core/src/async_runtime/spawn.rs new file mode 100644 index 0000000..2760982 --- /dev/null +++ b/core/src/async_runtime/spawn.rs @@ -0,0 +1,12 @@ +use std::future::Future; + +use super::Task; + +pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { + #[cfg(feature = "smol")] + let result: Task<T> = smol::spawn(future).into(); + #[cfg(feature = "tokio")] + let result: Task<T> = tokio::spawn(future).into(); + + result +} diff --git a/core/src/async_runtime/task.rs b/core/src/async_runtime/task.rs new file mode 100644 index 0000000..a681b0f --- /dev/null +++ b/core/src/async_runtime/task.rs @@ -0,0 +1,52 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::error::Error; + +pub struct Task<T> { + #[cfg(feature = "smol")] + inner_task: smol::Task<T>, + #[cfg(feature = "tokio")] + inner_task: tokio::task::JoinHandle<T>, +} + +impl<T> Task<T> { + pub async fn cancel(self) { + #[cfg(feature = "smol")] + self.inner_task.cancel().await; + #[cfg(feature = "tokio")] + self.inner_task.abort(); + } +} + +impl<T> Future for Task<T> { + type Output = Result<T, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[cfg(feature = "smol")] + let result = smol::Task::poll(Pin::new(&mut self.inner_task), cx); + #[cfg(feature = "tokio")] + let result = tokio::task::JoinHandle::poll(Pin::new(&mut self.inner_task), cx); + + #[cfg(feature = "smol")] + return result.map(Ok); + + #[cfg(feature = "tokio")] + return result.map_err(|e| e.into()); + } +} + +#[cfg(feature = "smol")] +impl<T> From<smol::Task<T>> for Task<T> { + fn from(t: smol::Task<T>) -> Task<T> { + Task { inner_task: t } + } +} + +#[cfg(feature = "tokio")] +impl<T> From<tokio::task::JoinHandle<T>> for Task<T> { + fn from(t: tokio::task::JoinHandle<T>) -> Task<T> { + Task { inner_task: t } + } +} diff --git a/core/src/async_runtime/timer.rs b/core/src/async_runtime/timer.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/core/src/async_runtime/timer.rs @@ -0,0 +1 @@ + |