aboutsummaryrefslogtreecommitdiff
path: root/core/src/async_runtime
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-04-11 10:19:20 +0200
committerhozan23 <hozan23@karyontech.net>2024-05-19 13:51:30 +0200
commit0992071a7f1a36424bcfaf1fbc84541ea041df1a (patch)
tree961d73218af672797d49f899289bef295bc56493 /core/src/async_runtime
parenta69917ecd8272a4946cfd12c75bf8f8c075b0e50 (diff)
add support for tokio & improve net crate api
Diffstat (limited to 'core/src/async_runtime')
-rw-r--r--core/src/async_runtime/executor.rs100
-rw-r--r--core/src/async_runtime/io.rs9
-rw-r--r--core/src/async_runtime/lock.rs5
-rw-r--r--core/src/async_runtime/mod.rs25
-rw-r--r--core/src/async_runtime/net.rs12
-rw-r--r--core/src/async_runtime/spawn.rs12
-rw-r--r--core/src/async_runtime/task.rs52
-rw-r--r--core/src/async_runtime/timer.rs1
8 files changed, 216 insertions, 0 deletions
diff --git a/core/src/async_runtime/executor.rs b/core/src/async_runtime/executor.rs
new file mode 100644
index 0000000..9335f12
--- /dev/null
+++ b/core/src/async_runtime/executor.rs
@@ -0,0 +1,100 @@
+use std::{future::Future, panic::catch_unwind, sync::Arc, thread};
+
+use once_cell::sync::OnceCell;
+
+#[cfg(feature = "smol")]
+pub use smol::Executor as SmolEx;
+
+#[cfg(feature = "tokio")]
+pub use tokio::runtime::Runtime;
+
+use super::Task;
+
+#[derive(Clone)]
+pub struct Executor {
+ #[cfg(feature = "smol")]
+ inner: Arc<SmolEx<'static>>,
+ #[cfg(feature = "tokio")]
+ inner: Arc<Runtime>,
+}
+
+impl Executor {
+ pub fn spawn<T: Send + 'static>(
+ &self,
+ future: impl Future<Output = T> + Send + 'static,
+ ) -> Task<T> {
+ self.inner.spawn(future).into()
+ }
+}
+
+static GLOBAL_EXECUTOR: OnceCell<Executor> = OnceCell::new();
+
+/// Returns a single-threaded global executor
+pub fn global_executor() -> Executor {
+ #[cfg(feature = "smol")]
+ fn init_executor() -> Executor {
+ let ex = smol::Executor::new();
+ thread::Builder::new()
+ .name("smol-executor".to_string())
+ .spawn(|| loop {
+ catch_unwind(|| {
+ smol::block_on(global_executor().inner.run(std::future::pending::<()>()))
+ })
+ .ok();
+ })
+ .expect("cannot spawn executor thread");
+ // Prevent spawning another thread by running the process driver on this
+ // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs
+ ex.spawn(async_process::driver()).detach();
+ Executor {
+ inner: Arc::new(ex),
+ }
+ }
+
+ #[cfg(feature = "tokio")]
+ fn init_executor() -> Executor {
+ let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime"));
+ let ex_cloned = ex.clone();
+ thread::Builder::new()
+ .name("tokio-executor".to_string())
+ .spawn(move || {
+ catch_unwind(|| ex_cloned.block_on(std::future::pending::<()>())).ok();
+ })
+ .expect("cannot spawn tokio runtime thread");
+ Executor { inner: ex }
+ }
+
+ GLOBAL_EXECUTOR.get_or_init(init_executor).clone()
+}
+
+#[cfg(feature = "smol")]
+impl From<Arc<smol::Executor<'static>>> for Executor {
+ fn from(ex: Arc<smol::Executor<'static>>) -> Executor {
+ Executor { inner: ex }
+ }
+}
+
+#[cfg(feature = "tokio")]
+impl From<Arc<tokio::runtime::Runtime>> for Executor {
+ fn from(rt: Arc<tokio::runtime::Runtime>) -> Executor {
+ Executor { inner: rt }
+ }
+}
+
+#[cfg(feature = "smol")]
+impl From<smol::Executor<'static>> for Executor {
+ fn from(ex: smol::Executor<'static>) -> Executor {
+ Executor {
+ inner: Arc::new(ex),
+ }
+ }
+}
+
+#[cfg(feature = "tokio")]
+impl From<tokio::runtime::Runtime> for Executor {
+ fn from(rt: tokio::runtime::Runtime) -> Executor {
+ Executor {
+ inner: Arc::new(rt),
+ }
+ }
+}
diff --git a/core/src/async_runtime/io.rs b/core/src/async_runtime/io.rs
new file mode 100644
index 0000000..161c258
--- /dev/null
+++ b/core/src/async_runtime/io.rs
@@ -0,0 +1,9 @@
+#[cfg(feature = "smol")]
+pub use smol::io::{
+ split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
+};
+
+#[cfg(feature = "tokio")]
+pub use tokio::io::{
+ split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
+};
diff --git a/core/src/async_runtime/lock.rs b/core/src/async_runtime/lock.rs
new file mode 100644
index 0000000..fc84d1d
--- /dev/null
+++ b/core/src/async_runtime/lock.rs
@@ -0,0 +1,5 @@
+#[cfg(feature = "smol")]
+pub use smol::lock::{Mutex, MutexGuard, OnceCell, RwLock};
+
+#[cfg(feature = "tokio")]
+pub use tokio::sync::{Mutex, MutexGuard, OnceCell, RwLock};
diff --git a/core/src/async_runtime/mod.rs b/core/src/async_runtime/mod.rs
new file mode 100644
index 0000000..d91d01b
--- /dev/null
+++ b/core/src/async_runtime/mod.rs
@@ -0,0 +1,25 @@
+mod executor;
+pub mod io;
+pub mod lock;
+pub mod net;
+mod spawn;
+mod task;
+mod timer;
+
+pub use executor::{global_executor, Executor};
+pub use spawn::spawn;
+pub use task::Task;
+
+#[cfg(test)]
+pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T {
+ #[cfg(feature = "smol")]
+ let result = smol::block_on(future);
+ #[cfg(feature = "tokio")]
+ let result = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(future);
+
+ result
+}
diff --git a/core/src/async_runtime/net.rs b/core/src/async_runtime/net.rs
new file mode 100644
index 0000000..5c004ce
--- /dev/null
+++ b/core/src/async_runtime/net.rs
@@ -0,0 +1,12 @@
+pub use std::os::unix::net::SocketAddr;
+
+#[cfg(feature = "smol")]
+pub use smol::net::{
+ unix::{SocketAddr as UnixSocketAddr, UnixListener, UnixStream},
+ TcpListener, TcpStream, UdpSocket,
+};
+
+#[cfg(feature = "tokio")]
+pub use tokio::net::{
+ unix::SocketAddr as UnixSocketAddr, TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream,
+};
diff --git a/core/src/async_runtime/spawn.rs b/core/src/async_runtime/spawn.rs
new file mode 100644
index 0000000..2760982
--- /dev/null
+++ b/core/src/async_runtime/spawn.rs
@@ -0,0 +1,12 @@
+use std::future::Future;
+
+use super::Task;
+
+pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
+ #[cfg(feature = "smol")]
+ let result: Task<T> = smol::spawn(future).into();
+ #[cfg(feature = "tokio")]
+ let result: Task<T> = tokio::spawn(future).into();
+
+ result
+}
diff --git a/core/src/async_runtime/task.rs b/core/src/async_runtime/task.rs
new file mode 100644
index 0000000..a681b0f
--- /dev/null
+++ b/core/src/async_runtime/task.rs
@@ -0,0 +1,52 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use crate::error::Error;
+
+pub struct Task<T> {
+ #[cfg(feature = "smol")]
+ inner_task: smol::Task<T>,
+ #[cfg(feature = "tokio")]
+ inner_task: tokio::task::JoinHandle<T>,
+}
+
+impl<T> Task<T> {
+ pub async fn cancel(self) {
+ #[cfg(feature = "smol")]
+ self.inner_task.cancel().await;
+ #[cfg(feature = "tokio")]
+ self.inner_task.abort();
+ }
+}
+
+impl<T> Future for Task<T> {
+ type Output = Result<T, Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[cfg(feature = "smol")]
+ let result = smol::Task::poll(Pin::new(&mut self.inner_task), cx);
+ #[cfg(feature = "tokio")]
+ let result = tokio::task::JoinHandle::poll(Pin::new(&mut self.inner_task), cx);
+
+ #[cfg(feature = "smol")]
+ return result.map(Ok);
+
+ #[cfg(feature = "tokio")]
+ return result.map_err(|e| e.into());
+ }
+}
+
+#[cfg(feature = "smol")]
+impl<T> From<smol::Task<T>> for Task<T> {
+ fn from(t: smol::Task<T>) -> Task<T> {
+ Task { inner_task: t }
+ }
+}
+
+#[cfg(feature = "tokio")]
+impl<T> From<tokio::task::JoinHandle<T>> for Task<T> {
+ fn from(t: tokio::task::JoinHandle<T>) -> Task<T> {
+ Task { inner_task: t }
+ }
+}
diff --git a/core/src/async_runtime/timer.rs b/core/src/async_runtime/timer.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/core/src/async_runtime/timer.rs
@@ -0,0 +1 @@
+