twizzler_async/task.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::thread_local::ThreadLocalExecutor;
pub(crate) type Runnable = async_task::Task<u32>;
/// A spawned future. Tasks are futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, it is automatically canceled and it won't be polled again. You can also
/// cancel a task explicitly with the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic are immediately canceled, and awaiting a canceled task causes a panic. If the
/// future panics, the panic will be unwound into the [`run()`][crate::run()] invocation that polled
/// it, but this doesn't apply to the blocking executor, which will simply ignore panics and
/// continue running.
#[must_use = "futures do nothing unless you `.await` or poll them; tasks, specifically, get canceled if you drop them, use `.detach()` to run them in the background"]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, u32>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation of [`run()`][crate::run()].
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
ThreadLocalExecutor::spawn(future)
}
}
impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the global executor.
///
/// This future may be stolen and polled by any thread calling [`run()`][crate::run()], and thus
/// the future (and its output) must be Send.
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
crate::exec::Executor::get().spawn(future)
}
}
impl<T, E> Task<Result<T, E>>
where
T: Send + 'static,
E: std::fmt::Debug + Send + 'static,
{
/// Spawns a new task and awaits and unwraps the result.
pub fn unwrap(self) -> Task<T> {
Task::spawn(async { self.await.unwrap() })
}
/// Spawns a new task and awaits and unwraps the result, panicing with the provided message if
/// the unwrap fails.
pub fn expect(self, msg: &str) -> Task<T> {
let msg = msg.to_owned();
Task::spawn(async move { self.await.expect(&msg) })
}
}
impl Task<()> {
/// Detach the task and let it run in the background.
/// # Examples
///
/// ```no_run
/// use std::time::Duration;
///
/// use twizzler_async::{Task, Timer};
///
/// # twizzler_async::run(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
}
}
impl<T> Task<T> {
/// Cancels the task and waits for it to stop running. If the task completed before canceling,
/// return the task's output, or `None` if it wasn't complete. The advantage of calling
/// `cancel()` explicitly over jus dropping the task is that it, one, waits for the task to stop
/// running before returning, and two, it returns the result if the task _did_ successfully
/// complete.
pub async fn cancel(self) -> Option<T> {
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
handle.cancel()
}
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
Poll::Pending => Poll::Pending,
}
}
}
impl<T> From<Task<T>> for async_task::JoinHandle<T, u32> {
fn from(mut t: Task<T>) -> Self {
t.0.take().expect("task was already canceled or failed")
}
}