twizzler_async/run.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
use std::{
future::Future,
task::{Context, Poll},
thread,
};
use crate::{
event::FlagEvent, exec::Executor, reactor::Reactor, thread_local::ThreadLocalExecutor, throttle,
};
pub(crate) fn enter<T>(f: impl FnOnce() -> T) -> T {
f()
}
/// Runs executors.
///
/// We run both the thread-local executor and the global executor, and also check for timer events.
/// If we cannot make progress, we call the reactor, which handles waiting and waking up on
/// [crate::Async] and [crate::AsyncDuplex] objects for use in externally signaled events that
/// control non-blocking closures' readiness.
///
/// # Examples
/// ```no_run
/// // Run executors on the current thread.
/// run(async {
/// println!("Hello!");
/// });
/// ```
///
/// Multi-threaded:
/// ```no_run
/// use futures::future;
/// let num_threads = 4;
/// for _ in 0..num_threads {
/// // Spawn a pending future.
/// std::thread::spawn(|| twizzler_async::run(future::pending::<()>()))
/// }
///
/// twizzler_async::block_on(async {
/// twizzler_async::Task::spawn(async {
/// println!("Hello from executor thread!");
/// })
/// .await;
/// });
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
let local = ThreadLocalExecutor::new();
let exec = Executor::get();
let worker = exec.worker();
let reactor = Reactor::get();
let ev = local.event().clone();
let waker = async_task::waker_fn(move || ev.notify());
let cx = &mut Context::from_waker(&waker);
futures_util::pin_mut!(future);
let enter = |f| local.enter(|| enter(f));
let enter = |f| worker.enter(|| enter(f));
enter(|| {
let mut yields = 0;
let flag_events = [local.event(), exec.event()];
loop {
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
return val;
}
let more_local = local.execute();
let more_exec = worker.execute();
react(reactor, &flag_events, more_exec || more_local, true);
if more_exec || more_local {
yields = 0;
continue;
}
yields += 1;
if yields < 4 {
thread::yield_now();
continue;
}
yields = 0;
react(reactor, &flag_events, false, false);
}
})
}
fn react(reactor: &Reactor, flag_events: &[&FlagEvent], mut more_tasks: bool, try_only: bool) {
for ev in flag_events {
if ev.clear() {
more_tasks = true;
}
}
if more_tasks {
reactor.poll(flag_events, try_only);
} else {
reactor.wait(flag_events, try_only);
if !try_only {
for ev in flag_events {
ev.clear();
}
}
}
}