twizzler_async/async_source.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
use std::sync::Arc;
use twizzler_abi::syscall::ThreadSyncSleep;
use crate::reactor::{Reactor, Source};
/// Implement setting up externally signaled asynchronous events for the async runner to wait for,
/// in the case where there is a single "runnable" abstraction for this object.
pub trait AsyncSetup {
/// The error type returned by any closures run.
type Error: PartialEq;
/// The specific variant of the error type that indicates that an operation would block.
const WOULD_BLOCK: Self::Error;
/// Return a thread sync sleep operation specification for this handle.
fn setup_sleep(&self) -> ThreadSyncSleep;
}
#[derive(Debug)]
/// A wrapper type around some "handle" that we want to perform asynchronous operations on, where
/// that handle must implement [AsyncSetup].
pub struct Async<T> {
source: Arc<Source>,
handle: Option<Box<T>>,
}
impl<T: AsyncSetup> Async<T> {
/// Construct a new Async<T>.
pub fn new(handle: T) -> Self {
Self {
source: Reactor::get().insert_wait_op(handle.setup_sleep()),
handle: Some(Box::new(handle)),
}
}
/// Return a reference to the underlying handle.
pub fn get_ref(&self) -> &T {
self.handle.as_ref().unwrap()
}
/// Consume this Async<T> and return the handle.
pub fn into_inner(mut self) -> T {
let handle = *self.handle.take().unwrap();
Reactor::get().remove_wait_op(&self.source);
handle
}
/// Asynchronously run an operation that will sleep if not ready. The closure to run must return
/// `Result<_, T::Error>`, and should return `Err(T::WOULD_BLOCK)` if the operation is not
/// ready.
pub async fn run_with<R>(
&self,
op: impl FnMut(&T) -> Result<R, T::Error>,
) -> Result<R, T::Error> {
let mut op = op;
loop {
let sleep_op = self.get_ref().setup_sleep();
match op(self.get_ref()) {
Err(e) if e == T::WOULD_BLOCK => {}
res => return res,
}
self.source.runnable(sleep_op).await;
}
}
}
impl<T> Drop for Async<T> {
fn drop(&mut self) {
if self.handle.is_some() {
let _ = Reactor::get().remove_wait_op(&self.source);
self.handle.take();
}
}
}
/// Implement setting up externally signaled asynchronous events for the async runner to wait for,
/// in the case where there is a duplex mode for reading and writing to this object, each of which
/// could fail with some "would block" error.
pub trait AsyncDuplexSetup {
/// The error type returned by read operations.
type ReadError: PartialEq;
/// The error type returned by write operations.
type WriteError: PartialEq;
/// The specific variant of the error type that indicates that a read operation would block.
const READ_WOULD_BLOCK: Self::ReadError;
/// The specific variant of the error type that indicates that a write operation would block.
const WRITE_WOULD_BLOCK: Self::WriteError;
/// Return a thread sync sleep operation specification for reading from this handle.
fn setup_read_sleep(&self) -> ThreadSyncSleep;
/// Return a thread sync sleep operation specification for writing to this handle.
fn setup_write_sleep(&self) -> ThreadSyncSleep;
}
/// A wrapper type around some "handle" that we want to perform asynchronous operations on, where
/// that handle must implement [AsyncDuplexSetup].
pub struct AsyncDuplex<T> {
read_source: Arc<Source>,
write_source: Arc<Source>,
handle: Option<Box<T>>,
}
impl<T: AsyncDuplexSetup> AsyncDuplex<T> {
/// Construct a new `Async<T>`.
pub fn new(handle: T) -> Self {
Self {
read_source: Reactor::get().insert_wait_op(handle.setup_read_sleep()),
write_source: Reactor::get().insert_wait_op(handle.setup_write_sleep()),
handle: Some(Box::new(handle)),
}
}
/// Consume the wrapper and return the underlying handle.
pub fn into_inner(mut self) -> T {
let handle = *self.handle.take().unwrap();
Reactor::get().remove_wait_op(&self.read_source);
Reactor::get().remove_wait_op(&self.write_source);
handle
}
/// Return a reference to the underlying handle.
pub fn get_ref(&self) -> &T {
self.handle.as_ref().unwrap()
}
/// Asynchronously run a read-like operation that will sleep if not ready. The closure to run
/// must return `Result<_, T::ReadError>`, and should return `Err(T::READ_WOULD_BLOCK)` if
/// the operation is not ready.
pub async fn read_with<R>(
&self,
op: impl FnMut(&T) -> Result<R, T::ReadError>,
) -> Result<R, T::ReadError> {
let mut op = op;
loop {
let sleep_op = self.get_ref().setup_read_sleep();
match op(self.get_ref()) {
Err(e) if e == T::READ_WOULD_BLOCK => {}
res => return res,
}
self.read_source.runnable(sleep_op).await;
}
}
/// Asynchronously run a write-like operation that will sleep if not ready. The closure to run
/// must return `Result<_, T::WriteError>`, and should return `Err(T::WRITE_WOULD_BLOCK)` if
/// the operation is not ready.
pub async fn write_with<R>(
&self,
op: impl FnMut(&T) -> Result<R, T::WriteError>,
) -> Result<R, T::WriteError> {
let mut op = op;
loop {
let sleep_op = self.get_ref().setup_write_sleep();
match op(self.get_ref()) {
Err(e) if e == T::WRITE_WOULD_BLOCK => {}
res => return res,
}
self.write_source.runnable(sleep_op).await;
}
}
}
impl<T> Drop for AsyncDuplex<T> {
fn drop(&mut self) {
if self.handle.is_some() {
let _ = Reactor::get().remove_wait_op(&self.read_source);
let _ = Reactor::get().remove_wait_op(&self.write_source);
self.handle.take();
}
}
}