twizzler_driver/device/
events.rs

1//! Manage events for a device, including mailbox messages and interrupts.
2
3use std::{
4    collections::VecDeque,
5    io::ErrorKind,
6    pin::Pin,
7    sync::{atomic::Ordering, Arc, Mutex},
8};
9
10use async_io::Async;
11use futures::future::select_all;
12use twizzler_abi::{
13    device::{
14        BusType, DeviceInterruptFlags, DeviceRepr, InterruptVector, MailboxPriority,
15        NUM_DEVICE_INTERRUPTS,
16    },
17    syscall::{ThreadSyncFlags, ThreadSyncReference, ThreadSyncSleep},
18};
19use twizzler_futures::TwizzlerWaitable;
20use twizzler_rt_abi::{
21    error::{ResourceError, TwzError},
22    Result,
23};
24
25use super::Device;
26
27struct DeviceEventStreamInner {
28    msg_queue: Vec<VecDeque<u64>>,
29}
30
31impl DeviceEventStreamInner {
32    fn new() -> Self {
33        Self {
34            msg_queue: (0..(MailboxPriority::Num as usize))
35                .into_iter()
36                .map(|_| VecDeque::new())
37                .collect(),
38        }
39    }
40}
41
42struct IntInner {
43    inum: usize,
44    repr: Arc<Device>,
45}
46
47impl IntInner {
48    fn repr(&self) -> &DeviceRepr {
49        self.repr.repr()
50    }
51
52    fn new(repr: Arc<Device>, inum: usize) -> Self {
53        Self { inum, repr }
54    }
55}
56
57impl TwizzlerWaitable for IntInner {
58    fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
59        let repr = self.repr();
60        (repr.setup_interrupt_sleep(self.inum), false)
61    }
62
63    fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
64        let repr = self.repr();
65        (repr.setup_interrupt_sleep(self.inum), false)
66    }
67}
68
69struct MailboxInner {
70    repr: Arc<Device>,
71    inum: usize,
72}
73
74impl Unpin for MailboxInner {}
75impl Unpin for IntInner {}
76
77impl MailboxInner {
78    fn repr(&self) -> &DeviceRepr {
79        self.repr.repr()
80    }
81
82    fn new(repr: Arc<Device>, inum: usize) -> Self {
83        Self { inum, repr }
84    }
85}
86
87impl TwizzlerWaitable for MailboxInner {
88    fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
89        (
90            ThreadSyncSleep::new(
91                ThreadSyncReference::Virtual(&self.repr().mailboxes[self.inum]),
92                0,
93                twizzler_abi::syscall::ThreadSyncOp::Equal,
94                ThreadSyncFlags::empty(),
95            ),
96            false,
97        )
98    }
99
100    fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
101        (
102            ThreadSyncSleep::new(
103                ThreadSyncReference::Virtual(&self.repr().mailboxes[self.inum]),
104                0,
105                twizzler_abi::syscall::ThreadSyncOp::Equal,
106                ThreadSyncFlags::empty(),
107            ),
108            false,
109        )
110    }
111}
112
113/// A manager for device events, including interrupt handling.
114pub struct DeviceEventStream {
115    inner: Mutex<DeviceEventStreamInner>,
116    asyncs: Vec<Async<Pin<Box<IntInner>>>>,
117    async_mb: Vec<Async<Pin<Box<MailboxInner>>>>,
118    device: Arc<Device>,
119}
120
121/// A handle for an allocated interrupt on a device.
122pub struct InterruptInfo {
123    es: Arc<DeviceEventStream>,
124    _vec: InterruptVector,
125    devint: u32,
126    inum: usize,
127}
128
129impl InterruptInfo {
130    /// Wait until the next interrupt occurs.
131    pub async fn next(&self) -> Option<u64> {
132        self.es.next(self.inum).await
133    }
134
135    /// Get the interrupt number for programming the device.
136    pub fn devint(&self) -> u32 {
137        self.devint
138    }
139}
140
141impl Drop for InterruptInfo {
142    fn drop(&mut self) {
143        self.es.free_interrupt(self)
144    }
145}
146
147impl DeviceEventStream {
148    pub(crate) fn free_interrupt(&self, _ii: &InterruptInfo) {
149        // TODO
150    }
151
152    /// Allocate a new interrupt on this device.
153    pub(crate) fn allocate_interrupt(self: &Arc<Self>) -> Result<InterruptInfo> {
154        // SAFETY: We grab ownership of the interrupt repr data via the atomic swap.
155        for i in 0..NUM_DEVICE_INTERRUPTS {
156            if self.device.repr().interrupts[i]
157                .taken
158                .swap(1, std::sync::atomic::Ordering::SeqCst)
159                == 0
160            {
161                let (vec, devint) = match self.device.bus_type() {
162                    BusType::Pcie => self.device.allocate_interrupt(i)?,
163                    _ => return Err(TwzError::NOT_SUPPORTED),
164                };
165                self.device
166                    .repr_mut()
167                    .register_interrupt(i, vec, DeviceInterruptFlags::empty());
168                return Ok(InterruptInfo {
169                    es: self.clone(),
170                    _vec: vec,
171                    devint,
172                    inum: i,
173                });
174            }
175        }
176        Err(TwzError::Resource(ResourceError::OutOfResources))
177    }
178
179    pub(crate) fn new(device: Arc<Device>) -> Self {
180        let asyncs = (0..NUM_DEVICE_INTERRUPTS)
181            .into_iter()
182            .map(|i| Async::new_pin(IntInner::new(device.clone(), i)).unwrap())
183            .collect();
184        let async_mb = (0..(MailboxPriority::Num as usize))
185            .into_iter()
186            .map(|i| Async::new_pin(MailboxInner::new(device.clone(), i)).unwrap())
187            .collect();
188        Self {
189            inner: Mutex::new(DeviceEventStreamInner::new()),
190            asyncs,
191            async_mb,
192            device,
193        }
194    }
195
196    fn repr(&self) -> &DeviceRepr {
197        self.device.repr()
198    }
199
200    pub(crate) fn check_mailbox(&self, pri: MailboxPriority) -> Option<u64> {
201        let mut inner = self.inner.lock().unwrap();
202        inner.msg_queue[pri as usize].pop_front()
203    }
204
205    fn future_of_int(
206        &self,
207        inum: usize,
208    ) -> impl std::future::Future<Output = std::io::Result<(usize, u64)>> + '_ {
209        Box::pin(self.asyncs[inum].read_with(move |ii| {
210            ii.repr()
211                .check_for_interrupt(ii.inum)
212                .ok_or(ErrorKind::WouldBlock.into())
213                .map(|x| (inum, x))
214        }))
215    }
216
217    fn future_of_mb(
218        &self,
219        inum: usize,
220    ) -> impl std::future::Future<Output = std::io::Result<(usize, u64)>> + '_ {
221        Box::pin(self.async_mb[inum].read_with(move |ii| {
222            ii.repr()
223                .check_for_mailbox(ii.inum)
224                .ok_or(ErrorKind::WouldBlock.into())
225                .map(|x| (inum, x))
226        }))
227    }
228
229    fn check_add_msg(&self, i: usize) {
230        if let Some(x) = self.repr().check_for_mailbox(i) {
231            self.inner.lock().unwrap().msg_queue[i].push_back(x)
232        }
233    }
234
235    pub(crate) async fn next(&self, int: usize) -> Option<u64> {
236        if self.repr().interrupts[int].taken.load(Ordering::SeqCst) == 0 {
237            return None;
238        }
239        if let Some(x) = self.repr().check_for_interrupt(int) {
240            return Some(x);
241        }
242
243        let fut = self.future_of_int(int);
244        fut.await.ok().map(|x| x.1)
245    }
246
247    pub(crate) async fn next_msg(&self, min: MailboxPriority) -> (MailboxPriority, u64) {
248        loop {
249            for i in 0..(MailboxPriority::Num as usize) {
250                self.check_add_msg(i);
251            }
252
253            for i in ((min as usize)..(MailboxPriority::Num as usize)).rev() {
254                if let Some(x) = self.check_mailbox(i.try_into().unwrap()) {
255                    return (i.try_into().unwrap(), x);
256                }
257            }
258
259            let futs = ((min as usize)..(MailboxPriority::Num as usize))
260                .into_iter()
261                .map(|i| self.future_of_mb(i));
262
263            let (pri, x) = select_all(futs).await.0.unwrap();
264            self.inner.lock().unwrap().msg_queue[pri].push_back(x);
265        }
266    }
267}