twizzler_queue/
sender_queue.rs

1use std::{
2    collections::BTreeMap,
3    future::Future,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicU32, Ordering},
7        Arc, Mutex,
8    },
9    task::{Poll, Waker},
10};
11
12use async_io::Async;
13use futures::FutureExt;
14use twizzler_queue_raw::{QueueError, ReceiveFlags, SubmissionFlags};
15
16use crate::Queue;
17
18struct QueueSenderInner<S, C> {
19    queue: Queue<S, C>,
20}
21
22struct WaitPoint<C> {
23    item: Option<(u32, C)>,
24    waker: Option<Waker>,
25}
26
27struct WaitPointFuture<'a, S: Copy + Send + Sync, C: Copy + Send + Sync> {
28    state: Arc<Mutex<WaitPoint<C>>>,
29    sender: &'a QueueSender<S, C>,
30}
31
32impl<'a, S: Copy + Send + Sync + 'static, C: Copy + Send + Sync + 'static> Future
33    for WaitPointFuture<'a, S, C>
34{
35    type Output = Result<(u32, C), QueueError>;
36
37    fn poll(
38        self: std::pin::Pin<&mut Self>,
39        cx: &mut std::task::Context<'_>,
40    ) -> std::task::Poll<Self::Output> {
41        if let Some((id, item)) = self.sender.poll_completions() {
42            self.sender.handle_completion(id, item);
43        }
44        let mut state = self.state.lock().unwrap();
45        if let Some(item) = state.item.take() {
46            Poll::Ready(Ok(item))
47        } else {
48            state.waker = Some(cx.waker().clone());
49            Poll::Pending
50        }
51    }
52}
53
54/// An async-supported sending-half of a [Queue]. This is to support systems that want to
55/// asynchronously send items to a receiver, under the assumption that the receiver sends
56/// completions to indicate that a request has been finished, and that the send ID can be reused.
57///
58/// Thus, this queue interally allocates, sends, and reuses IDs for requests.
59pub struct QueueSender<S: Copy, C: Copy> {
60    counter: AtomicU32,
61    reuse: Mutex<Vec<u32>>,
62    inner: Async<Pin<Box<QueueSenderInner<S, C>>>>,
63    calls: Mutex<BTreeMap<u32, Arc<Mutex<WaitPoint<C>>>>>,
64}
65
66impl<S: Copy, C: Copy> twizzler_futures::TwizzlerWaitable for QueueSenderInner<S, C> {
67    fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
68        (
69            self.queue.setup_read_com_sleep(),
70            self.queue.has_pending_completion(),
71        )
72    }
73
74    fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
75        (
76            self.queue.setup_write_sub_sleep(),
77            self.queue.has_sub_space(),
78        )
79    }
80}
81
82impl<S: Copy + Sync + Send + 'static, C: Copy + Send + Sync + 'static> QueueSender<S, C> {
83    /// Build a new QueueSender from a [Queue].
84    pub fn new(queue: Queue<S, C>) -> Self {
85        Self {
86            counter: AtomicU32::new(0),
87            reuse: Mutex::new(vec![]),
88            inner: Async::new_pin(QueueSenderInner { queue }).unwrap(),
89            calls: Mutex::new(BTreeMap::new()),
90        }
91    }
92
93    fn next_id(&self) -> u32 {
94        let mut reuse = self.reuse.lock().unwrap();
95        reuse
96            .pop()
97            .unwrap_or_else(|| self.counter.fetch_add(1, Ordering::SeqCst))
98    }
99
100    pub unsafe fn release_id(&self, id: u32) {
101        self.reuse.lock().unwrap().push(id)
102    }
103
104    pub fn poll_completions(&self) -> Option<(u32, C)> {
105        self.inner
106            .get_ref()
107            .queue
108            .get_completion(ReceiveFlags::NON_BLOCK)
109            .ok()
110    }
111
112    pub fn wait_for_completion(&self) -> Option<(u32, C)> {
113        self.inner
114            .get_ref()
115            .queue
116            .get_completion(ReceiveFlags::empty())
117            .ok()
118    }
119
120    fn handle_completion(&self, id: u32, item: C) {
121        let mut calls = self.calls.lock().unwrap();
122        let call = calls
123            .remove(&id)
124            .expect("failed to find registered callback");
125        let mut call = call.lock().unwrap();
126        call.item = Some((id, item));
127        if let Some(waker) = call.waker.take() {
128            waker.wake();
129        }
130    }
131
132    /// Submit a request and don't wait for a response. WARNING: This will burn a request ID,
133    /// preventing it from ever being reused. This function is mostly useful for signalling an "end
134    /// of communication" event across the queue. If you want to submit and not immediately await,
135    /// you probably should create a task for your async block instead.
136    pub fn submit_no_wait(&self, item: S, flags: SubmissionFlags) {
137        let _ = self
138            .inner
139            .get_ref()
140            .queue
141            .submit(self.next_id(), item, flags);
142    }
143
144    /// Submit an item and await a completion.
145    pub async fn submit_and_wait(&self, item: S) -> Result<C, std::io::Error> {
146        let id = self.next_id();
147        let state = Arc::new(Mutex::new(WaitPoint::<C> {
148            item: None,
149            waker: None,
150        }));
151        {
152            let mut calls = self.calls.lock().unwrap();
153            calls.insert(id, state.clone());
154            drop(calls);
155        }
156        if let Some((id, item)) = self.poll_completions() {
157            self.handle_completion(id, item);
158        }
159        self.inner
160            .write_with(|inner| {
161                inner
162                    .queue
163                    .submit(id, item, SubmissionFlags::NON_BLOCK)
164                    .map_err(|e| e.into())
165            })
166            .await?;
167
168        let waiter = WaitPointFuture::<S, C> {
169            state,
170            sender: self,
171        };
172        let mut item = Box::pin(async { waiter.await }).fuse();
173        let mut recv = Box::pin(async {
174            loop {
175                let (id, item) = self
176                    .inner
177                    .read_with(|inner| {
178                        inner
179                            .queue
180                            .get_completion(ReceiveFlags::NON_BLOCK)
181                            .map_err(|e| e.into())
182                    })
183                    .await
184                    .unwrap();
185                self.handle_completion(id, item);
186            }
187        })
188        .fuse();
189        let result = futures::select! {
190            item_res = item => item_res,
191            recv_res = recv => recv_res,
192        }?;
193        unsafe { self.release_id(id) };
194        Ok(result.1)
195    }
196}