1use std::{
4 collections::VecDeque,
5 io::ErrorKind,
6 pin::Pin,
7 sync::{atomic::Ordering, Arc, Mutex},
8};
9
10use async_io::Async;
11use futures::future::select_all;
12use twizzler_abi::{
13 device::{
14 BusType, DeviceInterruptFlags, DeviceRepr, InterruptVector, MailboxPriority,
15 NUM_DEVICE_INTERRUPTS,
16 },
17 syscall::{ThreadSyncFlags, ThreadSyncReference, ThreadSyncSleep},
18};
19use twizzler_futures::TwizzlerWaitable;
20use twizzler_rt_abi::{
21 error::{ResourceError, TwzError},
22 Result,
23};
24
25use super::Device;
26
27struct DeviceEventStreamInner {
28 msg_queue: Vec<VecDeque<u64>>,
29}
30
31impl DeviceEventStreamInner {
32 fn new() -> Self {
33 Self {
34 msg_queue: (0..(MailboxPriority::Num as usize))
35 .into_iter()
36 .map(|_| VecDeque::new())
37 .collect(),
38 }
39 }
40}
41
42struct IntInner {
43 inum: usize,
44 repr: Arc<Device>,
45}
46
47impl IntInner {
48 fn repr(&self) -> &DeviceRepr {
49 self.repr.repr()
50 }
51
52 fn new(repr: Arc<Device>, inum: usize) -> Self {
53 Self { inum, repr }
54 }
55}
56
57impl TwizzlerWaitable for IntInner {
58 fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
59 let repr = self.repr();
60 (repr.setup_interrupt_sleep(self.inum), false)
61 }
62
63 fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
64 let repr = self.repr();
65 (repr.setup_interrupt_sleep(self.inum), false)
66 }
67}
68
69struct MailboxInner {
70 repr: Arc<Device>,
71 inum: usize,
72}
73
74impl Unpin for MailboxInner {}
75impl Unpin for IntInner {}
76
77impl MailboxInner {
78 fn repr(&self) -> &DeviceRepr {
79 self.repr.repr()
80 }
81
82 fn new(repr: Arc<Device>, inum: usize) -> Self {
83 Self { inum, repr }
84 }
85}
86
87impl TwizzlerWaitable for MailboxInner {
88 fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
89 (
90 ThreadSyncSleep::new(
91 ThreadSyncReference::Virtual(&self.repr().mailboxes[self.inum]),
92 0,
93 twizzler_abi::syscall::ThreadSyncOp::Equal,
94 ThreadSyncFlags::empty(),
95 ),
96 false,
97 )
98 }
99
100 fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
101 (
102 ThreadSyncSleep::new(
103 ThreadSyncReference::Virtual(&self.repr().mailboxes[self.inum]),
104 0,
105 twizzler_abi::syscall::ThreadSyncOp::Equal,
106 ThreadSyncFlags::empty(),
107 ),
108 false,
109 )
110 }
111}
112
113pub struct DeviceEventStream {
115 inner: Mutex<DeviceEventStreamInner>,
116 asyncs: Vec<Async<Pin<Box<IntInner>>>>,
117 async_mb: Vec<Async<Pin<Box<MailboxInner>>>>,
118 device: Arc<Device>,
119}
120
121pub struct InterruptInfo {
123 es: Arc<DeviceEventStream>,
124 _vec: InterruptVector,
125 devint: u32,
126 inum: usize,
127}
128
129impl InterruptInfo {
130 pub async fn next(&self) -> Option<u64> {
132 self.es.next(self.inum).await
133 }
134
135 pub fn devint(&self) -> u32 {
137 self.devint
138 }
139}
140
141impl Drop for InterruptInfo {
142 fn drop(&mut self) {
143 self.es.free_interrupt(self)
144 }
145}
146
147impl DeviceEventStream {
148 pub(crate) fn free_interrupt(&self, _ii: &InterruptInfo) {
149 }
151
152 pub(crate) fn allocate_interrupt(self: &Arc<Self>) -> Result<InterruptInfo> {
154 for i in 0..NUM_DEVICE_INTERRUPTS {
156 if self.device.repr().interrupts[i]
157 .taken
158 .swap(1, std::sync::atomic::Ordering::SeqCst)
159 == 0
160 {
161 let (vec, devint) = match self.device.bus_type() {
162 BusType::Pcie => self.device.allocate_interrupt(i)?,
163 _ => return Err(TwzError::NOT_SUPPORTED),
164 };
165 self.device
166 .repr_mut()
167 .register_interrupt(i, vec, DeviceInterruptFlags::empty());
168 return Ok(InterruptInfo {
169 es: self.clone(),
170 _vec: vec,
171 devint,
172 inum: i,
173 });
174 }
175 }
176 Err(TwzError::Resource(ResourceError::OutOfResources))
177 }
178
179 pub(crate) fn new(device: Arc<Device>) -> Self {
180 let asyncs = (0..NUM_DEVICE_INTERRUPTS)
181 .into_iter()
182 .map(|i| Async::new_pin(IntInner::new(device.clone(), i)).unwrap())
183 .collect();
184 let async_mb = (0..(MailboxPriority::Num as usize))
185 .into_iter()
186 .map(|i| Async::new_pin(MailboxInner::new(device.clone(), i)).unwrap())
187 .collect();
188 Self {
189 inner: Mutex::new(DeviceEventStreamInner::new()),
190 asyncs,
191 async_mb,
192 device,
193 }
194 }
195
196 fn repr(&self) -> &DeviceRepr {
197 self.device.repr()
198 }
199
200 pub(crate) fn check_mailbox(&self, pri: MailboxPriority) -> Option<u64> {
201 let mut inner = self.inner.lock().unwrap();
202 inner.msg_queue[pri as usize].pop_front()
203 }
204
205 fn future_of_int(
206 &self,
207 inum: usize,
208 ) -> impl std::future::Future<Output = std::io::Result<(usize, u64)>> + '_ {
209 Box::pin(self.asyncs[inum].read_with(move |ii| {
210 ii.repr()
211 .check_for_interrupt(ii.inum)
212 .ok_or(ErrorKind::WouldBlock.into())
213 .map(|x| (inum, x))
214 }))
215 }
216
217 fn future_of_mb(
218 &self,
219 inum: usize,
220 ) -> impl std::future::Future<Output = std::io::Result<(usize, u64)>> + '_ {
221 Box::pin(self.async_mb[inum].read_with(move |ii| {
222 ii.repr()
223 .check_for_mailbox(ii.inum)
224 .ok_or(ErrorKind::WouldBlock.into())
225 .map(|x| (inum, x))
226 }))
227 }
228
229 fn check_add_msg(&self, i: usize) {
230 if let Some(x) = self.repr().check_for_mailbox(i) {
231 self.inner.lock().unwrap().msg_queue[i].push_back(x)
232 }
233 }
234
235 pub(crate) async fn next(&self, int: usize) -> Option<u64> {
236 if self.repr().interrupts[int].taken.load(Ordering::SeqCst) == 0 {
237 return None;
238 }
239 if let Some(x) = self.repr().check_for_interrupt(int) {
240 return Some(x);
241 }
242
243 let fut = self.future_of_int(int);
244 fut.await.ok().map(|x| x.1)
245 }
246
247 pub(crate) async fn next_msg(&self, min: MailboxPriority) -> (MailboxPriority, u64) {
248 loop {
249 for i in 0..(MailboxPriority::Num as usize) {
250 self.check_add_msg(i);
251 }
252
253 for i in ((min as usize)..(MailboxPriority::Num as usize)).rev() {
254 if let Some(x) = self.check_mailbox(i.try_into().unwrap()) {
255 return (i.try_into().unwrap(), x);
256 }
257 }
258
259 let futs = ((min as usize)..(MailboxPriority::Num as usize))
260 .into_iter()
261 .map(|i| self.future_of_mb(i));
262
263 let (pri, x) = select_all(futs).await.0.unwrap();
264 self.inner.lock().unwrap().msg_queue[pri].push_back(x);
265 }
266 }
267}