twizzler_async/future.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
use std::{
future::Future,
sync::{Arc, Mutex},
task::{Poll, Waker},
time::{Duration, Instant},
};
use futures_util::FutureExt;
use crate::Timer;
/// A future that waits on two sub-futures until the first completes.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WaitForFirst<FutOne, FutTwo> {
one: FutOne,
two: Option<FutTwo>,
}
impl<FutOne: Unpin, FutTwo: Unpin> Unpin for WaitForFirst<FutOne, FutTwo> {}
/// A future that waits on two sub-futures until the first one completes. If the second one
/// completes first, this future will continue awaiting on the first future. If the first one
/// completes first, this future returns immediately without continuing to wait on the second
/// future.
pub fn wait_for_first<FutOne, FutTwo, T, R>(
one: FutOne,
two: FutTwo,
) -> WaitForFirst<FutOne, FutTwo>
where
FutOne: Future<Output = T>,
FutTwo: Future<Output = R>,
{
WaitForFirst {
one,
two: Some(two),
}
}
impl<FutOne, FutTwo> WaitForFirst<FutOne, FutTwo> {
pub fn into_inner(self) -> (FutOne, Option<FutTwo>) {
(self.one, self.two)
}
}
impl<FutOne: Future + Unpin, FutTwo: Future + Unpin> Future for WaitForFirst<FutOne, FutTwo> {
type Output = FutOne::Output;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if let Poll::Ready(e) = self.one.poll_unpin(cx) {
return Poll::Ready(e);
}
if let Some(two) = &mut self.two {
if two.poll_unpin(cx).is_ready() {
self.two = None;
}
}
Poll::Pending
}
}
#[derive(Default)]
pub struct FlagBlockInner {
wakers: Vec<Waker>,
epoch: u64,
}
#[derive(Default)]
/// A basic condition variable for async tasks. If you call wait() you get back a future that you
/// can await on, which will complete once another tasks calls signal_all(). But there's a gotcha
/// here.
///
/// Okay so you know the rule with mutexes and condition variables? Like, you have some predicate
/// that tells you "ready" or not, and this is tested with the mutex held, followed by waiting on
/// the condition variable (which automatically releases and reaquires the lock).
///
/// We have something similar. You need to call wait() with that mutex held, and then _after_ you
/// release the lock, you call await on the future returned by wait().
pub struct FlagBlock {
inner: Arc<Mutex<FlagBlockInner>>,
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FlagBlockFuture<'a> {
state: &'a FlagBlock,
val: u64,
added: bool,
}
impl FlagBlock {
/// Construct a new FlagBlock.
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(FlagBlockInner {
wakers: vec![],
epoch: 0,
})),
}
}
/// Signal anyone waiting.
pub fn signal_all(&self) {
let mut inner = self.inner.lock().unwrap();
inner.epoch += 1;
while let Some(w) = inner.wakers.pop() {
w.wake();
}
}
/// Return an awaitable future for the "readiness" of this condition variable.
pub fn wait(&self) -> FlagBlockFuture {
let inner = self.inner.lock().unwrap();
FlagBlockFuture {
state: self,
added: false,
val: inner.epoch,
}
}
}
impl<'a> Future for FlagBlockFuture<'a> {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let mut inner = self.state.inner.lock().unwrap();
if inner.epoch != self.val {
Poll::Ready(())
} else {
if !self.added {
inner.wakers.push(cx.waker().clone());
drop(inner);
self.added = true;
}
Poll::Pending
}
}
}
/// A future that awaits a sub-future until a timeout occurs.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timeout<T> {
value: T,
delay: Timer,
}
impl<T> Timeout<T> {
pub fn after(f: T, dur: Duration) -> Self {
Self {
value: f,
delay: Timer::after(dur),
}
}
pub fn at(f: T, at: Instant) -> Self {
Self {
value: f,
delay: Timer::at(at),
}
}
}
impl<T: Future + Unpin> Future for Timeout<T> {
type Output = Option<T::Output>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.value.poll_unpin(cx) {
Poll::Ready(res) => return Poll::Ready(Some(res)),
Poll::Pending => {}
}
match self.delay.poll_unpin(cx) {
Poll::Ready(_) => return Poll::Ready(None),
Poll::Pending => {}
}
Poll::Pending
}
}
/// Await a future until a timeout occurs (or that future completes). If the timeout happens, return
/// None, otherwise return Some of the result of the future. This timeout expires after a duration.
pub async fn timeout_after<F: Future>(f: F, dur: Duration) -> Option<F::Output> {
Timeout::after(Box::pin(f), dur).await
}
/// Await a future until a timeout occurs (or that future completes). If the timeout happens, return
/// None, otherwise return Some of the result of the future. This timeout expires at an instant in
/// time.
pub async fn timeout_at<F: Future>(f: F, at: Instant) -> Option<F::Output> {
Timeout::at(Box::pin(f), at).await
}