twizzler_queue/
sender_queue.rs1use std::{
2 collections::BTreeMap,
3 future::Future,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicU32, Ordering},
7 Arc, Mutex,
8 },
9 task::{Poll, Waker},
10};
11
12use async_io::Async;
13use futures::FutureExt;
14use twizzler_queue_raw::{QueueError, ReceiveFlags, SubmissionFlags};
15
16use crate::Queue;
17
18struct QueueSenderInner<S, C> {
19 queue: Queue<S, C>,
20}
21
22struct WaitPoint<C> {
23 item: Option<(u32, C)>,
24 waker: Option<Waker>,
25}
26
27struct WaitPointFuture<'a, S: Copy + Send + Sync, C: Copy + Send + Sync> {
28 state: Arc<Mutex<WaitPoint<C>>>,
29 sender: &'a QueueSender<S, C>,
30}
31
32impl<'a, S: Copy + Send + Sync + 'static, C: Copy + Send + Sync + 'static> Future
33 for WaitPointFuture<'a, S, C>
34{
35 type Output = Result<(u32, C), QueueError>;
36
37 fn poll(
38 self: std::pin::Pin<&mut Self>,
39 cx: &mut std::task::Context<'_>,
40 ) -> std::task::Poll<Self::Output> {
41 if let Some((id, item)) = self.sender.poll_completions() {
42 self.sender.handle_completion(id, item);
43 }
44 let mut state = self.state.lock().unwrap();
45 if let Some(item) = state.item.take() {
46 Poll::Ready(Ok(item))
47 } else {
48 state.waker = Some(cx.waker().clone());
49 Poll::Pending
50 }
51 }
52}
53
54pub struct QueueSender<S: Copy, C: Copy> {
60 counter: AtomicU32,
61 reuse: Mutex<Vec<u32>>,
62 inner: Async<Pin<Box<QueueSenderInner<S, C>>>>,
63 calls: Mutex<BTreeMap<u32, Arc<Mutex<WaitPoint<C>>>>>,
64}
65
66impl<S: Copy, C: Copy> twizzler_futures::TwizzlerWaitable for QueueSenderInner<S, C> {
67 fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
68 (
69 self.queue.setup_read_com_sleep(),
70 self.queue.has_pending_completion(),
71 )
72 }
73
74 fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
75 (
76 self.queue.setup_write_sub_sleep(),
77 self.queue.has_sub_space(),
78 )
79 }
80}
81
82impl<S: Copy + Sync + Send + 'static, C: Copy + Send + Sync + 'static> QueueSender<S, C> {
83 pub fn new(queue: Queue<S, C>) -> Self {
85 Self {
86 counter: AtomicU32::new(0),
87 reuse: Mutex::new(vec![]),
88 inner: Async::new_pin(QueueSenderInner { queue }).unwrap(),
89 calls: Mutex::new(BTreeMap::new()),
90 }
91 }
92
93 fn next_id(&self) -> u32 {
94 let mut reuse = self.reuse.lock().unwrap();
95 reuse
96 .pop()
97 .unwrap_or_else(|| self.counter.fetch_add(1, Ordering::SeqCst))
98 }
99
100 pub unsafe fn release_id(&self, id: u32) {
101 self.reuse.lock().unwrap().push(id)
102 }
103
104 pub fn poll_completions(&self) -> Option<(u32, C)> {
105 self.inner
106 .get_ref()
107 .queue
108 .get_completion(ReceiveFlags::NON_BLOCK)
109 .ok()
110 }
111
112 pub fn wait_for_completion(&self) -> Option<(u32, C)> {
113 self.inner
114 .get_ref()
115 .queue
116 .get_completion(ReceiveFlags::empty())
117 .ok()
118 }
119
120 fn handle_completion(&self, id: u32, item: C) {
121 let mut calls = self.calls.lock().unwrap();
122 let call = calls
123 .remove(&id)
124 .expect("failed to find registered callback");
125 let mut call = call.lock().unwrap();
126 call.item = Some((id, item));
127 if let Some(waker) = call.waker.take() {
128 waker.wake();
129 }
130 }
131
132 pub fn submit_no_wait(&self, item: S, flags: SubmissionFlags) {
137 let _ = self
138 .inner
139 .get_ref()
140 .queue
141 .submit(self.next_id(), item, flags);
142 }
143
144 pub async fn submit_and_wait(&self, item: S) -> Result<C, std::io::Error> {
146 let id = self.next_id();
147 let state = Arc::new(Mutex::new(WaitPoint::<C> {
148 item: None,
149 waker: None,
150 }));
151 {
152 let mut calls = self.calls.lock().unwrap();
153 calls.insert(id, state.clone());
154 drop(calls);
155 }
156 if let Some((id, item)) = self.poll_completions() {
157 self.handle_completion(id, item);
158 }
159 self.inner
160 .write_with(|inner| {
161 inner
162 .queue
163 .submit(id, item, SubmissionFlags::NON_BLOCK)
164 .map_err(|e| e.into())
165 })
166 .await?;
167
168 let waiter = WaitPointFuture::<S, C> {
169 state,
170 sender: self,
171 };
172 let mut item = Box::pin(async { waiter.await }).fuse();
173 let mut recv = Box::pin(async {
174 loop {
175 let (id, item) = self
176 .inner
177 .read_with(|inner| {
178 inner
179 .queue
180 .get_completion(ReceiveFlags::NON_BLOCK)
181 .map_err(|e| e.into())
182 })
183 .await
184 .unwrap();
185 self.handle_completion(id, item);
186 }
187 })
188 .fuse();
189 let result = futures::select! {
190 item_res = item => item_res,
191 recv_res = recv => recv_res,
192 }?;
193 unsafe { self.release_id(id) };
194 Ok(result.1)
195 }
196}