twizzler_queue_raw/
lib.rs

1//! A raw queue interface for Twizzler, making no assumptions about where the underlying headers and
2//! circular buffers are located. This means you probably don't want to use this --- instead, I
3//! suggest you use the wrapped version of this library, twizzler-queue, since that actually
4//! interacts with the object system.
5//!
6//! This library exists to provide an underlying implementation of the concurrent data structure for
7//! each individual raw queue so that this complex code can be reused in both userspace and the
8//! kernel.
9//!
10//! The basic design of a raw queue is two parts:
11//!
12//!   1. A header, which contains things like head pointers, tail pointers, etc.
13//!   2. A buffer, which contains the items that are enqueued.
14//!
15//! The queue is an MPSC lock-free blocking data structure. Any thread may submit to a queue, but
16//! only one thread may receive on that queue at a time. The queue is implemented with a head
17//! pointer, a tail pointer, a doorbell, and a waiters counter. Additionally, the queue is
18//! maintained in terms of "turns", that indicate which "go around" of the queue we are on (mod 2).
19//!
20//! # Let's look at an insert
21//! Here's what the queue looks like to start with. The 0_ indicates that it's empty, and turn is
22//! set to 0.
23//! ```text
24//!  b
25//!  t
26//!  h
27//! [0_, 0_, 0_]
28//! ```
29//! When inserting, the thread first reserves space:
30//! ```text
31//!  b
32//!  t
33//!      h
34//! [0_, 0_, 0_]
35//! ```
36//! Then it fills out the data:
37//! ```text
38//!  b
39//!  t
40//!      h
41//! [0X, 0_, 0_]
42//! ```
43//! Then it toggles the turn bit:
44//! ```text
45//!  b
46//!  t
47//!      h
48//! [1X, 0_, 0_]
49//! ```
50//! Next, it bumps the doorbell (and maybe wakes up a waiting consumer):
51//! ```text
52//!      b
53//!  t
54//!      h
55//! [1X, 0_, 0_]
56//! ```
57//!
58//! Now, let's say the consumer comes along and dequeues. First, it checks if it's empty by
59//! comparing tail and bell, and finds it's not empty. Then it checks if it's the correct turn. This
60//! turn is 1, so yes. Next, it remove the data from the queue:
61//! ```text
62//!      b
63//!  t
64//!      h
65//! [1_, 0_, 0_]
66//! ```
67//! And then finally it increments the tail counter:
68//! ```text
69//!      b
70//!      t
71//!      h
72//! [1_, 0_, 0_]
73//! ```
74
75#![cfg_attr(test, feature(test))]
76#![cfg_attr(not(any(feature = "std", test)), no_std)]
77
78use core::{
79    cell::UnsafeCell,
80    fmt::Display,
81    marker::PhantomData,
82    ptr::addr_of_mut,
83    sync::atomic::{AtomicU32, AtomicU64, Ordering},
84};
85
86#[derive(Clone, Copy, Default, Debug)]
87#[repr(C)]
88/// A queue entry. All queues must be formed of these, as the queue algorithm uses data inside this
89/// struct as part of its operation. The cmd_slot is used internally to track turn, and the info is
90/// used by the full queue structure to manage completion. The data T is user data passed around the
91/// queue.
92pub struct QueueEntry<T> {
93    cmd_slot: u32,
94    info: u32,
95    data: T,
96}
97
98impl<T> QueueEntry<T> {
99    #[inline]
100    fn get_cmd_slot(&self) -> u32 {
101        unsafe { core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).load(Ordering::SeqCst) }
102    }
103
104    #[inline]
105    fn set_cmd_slot(&self, v: u32) {
106        unsafe {
107            core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).store(v, Ordering::SeqCst);
108        }
109    }
110
111    #[inline]
112    /// Get the data item of a QueueEntry.
113    pub fn item(self) -> T {
114        self.data
115    }
116
117    #[inline]
118    /// Get the info tag of a QueueEntry.
119    pub fn info(&self) -> u32 {
120        self.info
121    }
122
123    /// Construct a new QueueEntry. The `info` tag should be used to inform completion events in the
124    /// full queue.
125    pub fn new(info: u32, item: T) -> Self {
126        Self {
127            cmd_slot: 0,
128            info,
129            data: item,
130        }
131    }
132}
133
134/// The base info structure stored in a Twizzler queue object. Used to open Twizzler queue objects
135/// and create a Queue.
136#[repr(C)]
137pub struct QueueBase<S, C> {
138    pub sub_hdr: usize,
139    pub com_hdr: usize,
140    pub sub_buf: usize,
141    pub com_buf: usize,
142    _pd: PhantomData<(S, C)>,
143}
144
145#[repr(C)]
146/// A raw queue header. This contains all the necessary counters and info to run the queue
147/// algorithm.
148pub struct RawQueueHdr {
149    l2len: usize,
150    stride: usize,
151    head: AtomicU32,
152    waiters: AtomicU32,
153    bell: AtomicU64,
154    tail: AtomicU64,
155}
156
157impl RawQueueHdr {
158    /// Construct a new raw queue header.
159    pub fn new(l2len: usize, stride: usize) -> Self {
160        Self {
161            l2len,
162            stride,
163            head: AtomicU32::new(0),
164            waiters: AtomicU32::new(0),
165            bell: AtomicU64::new(0),
166            tail: AtomicU64::new(0),
167        }
168    }
169
170    pub fn len_bytes(&self) -> usize {
171        self.len() * self.stride
172    }
173
174    #[inline]
175    pub fn len(&self) -> usize {
176        1 << self.l2len
177    }
178
179    #[inline]
180    fn is_full(&self, h: u32, t: u64) -> bool {
181        (h & 0x7fffffff) as u64 - (t & 0x7fffffff) >= self.len() as u64
182    }
183
184    #[inline]
185    fn is_empty(&self, bell: u64, tail: u64) -> bool {
186        (bell & 0x7fffffff) == (tail & 0x7fffffff)
187    }
188
189    #[inline]
190    fn is_turn<T>(&self, t: u64, item: *const QueueEntry<T>) -> bool {
191        let turn = (t / (self.len() as u64)) % 2;
192        let val = unsafe { &*item }.get_cmd_slot() >> 31;
193        (val == 0) == (turn == 1)
194    }
195
196    #[inline]
197    fn consumer_waiting(&self) -> bool {
198        (self.tail.load(Ordering::SeqCst) & (1 << 31)) != 0
199    }
200
201    #[inline]
202    fn submitter_waiting(&self) -> bool {
203        self.waiters.load(Ordering::SeqCst) > 0
204    }
205
206    #[inline]
207    fn consumer_set_waiting(&self, waiting: bool) {
208        if waiting {
209            self.tail.fetch_or(1 << 31, Ordering::SeqCst);
210        } else {
211            self.tail.fetch_and(!(1 << 31), Ordering::SeqCst);
212        }
213    }
214
215    #[inline]
216    fn inc_submit_waiting(&self) {
217        self.waiters.fetch_add(1, Ordering::SeqCst);
218    }
219
220    #[inline]
221    fn dec_submit_waiting(&self) {
222        self.waiters.fetch_sub(1, Ordering::SeqCst);
223    }
224
225    #[inline]
226    fn reserve_slot<W: Fn(&AtomicU64, u64)>(
227        &self,
228        flags: SubmissionFlags,
229        wait: W,
230    ) -> Result<u32, QueueError> {
231        let mut waiter = false;
232        let mut attempts = 1000;
233        let h = loop {
234            let h = self.head.load(Ordering::SeqCst);
235            let t = self.tail.load(Ordering::SeqCst);
236            if !self.is_full(h, t) {
237                if self
238                    .head
239                    .compare_exchange(h, h + 1, Ordering::SeqCst, Ordering::SeqCst)
240                    .is_ok()
241                {
242                    break h;
243                } else {
244                    core::hint::spin_loop();
245                    continue;
246                }
247            }
248
249            if flags.contains(SubmissionFlags::NON_BLOCK) {
250                return Err(QueueError::WouldBlock);
251            }
252
253            if attempts != 0 {
254                attempts -= 1;
255                core::hint::spin_loop();
256                continue;
257            }
258
259            if !waiter {
260                waiter = true;
261                self.inc_submit_waiting();
262            }
263
264            let t = self.tail.load(Ordering::SeqCst);
265            if self.is_full(h, t) {
266                wait(&self.tail, t);
267            }
268        };
269
270        if waiter {
271            self.dec_submit_waiting();
272        }
273
274        Ok(h & 0x7fffffff)
275    }
276
277    #[inline]
278    fn get_turn(&self, h: u32) -> bool {
279        (h / self.len() as u32) % 2 == 0
280    }
281
282    #[inline]
283    fn ring<R: Fn(&AtomicU64)>(&self, ring: R) {
284        self.bell.fetch_add(1, Ordering::SeqCst);
285        if self.consumer_waiting() {
286            ring(&self.bell)
287        }
288    }
289
290    pub fn has_pending<T>(&self, raw_buf: *const QueueEntry<T>) -> bool {
291        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
292        let b = self.bell.load(Ordering::SeqCst);
293        let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
294        !self.is_empty(b, t) && self.is_turn(t, item)
295    }
296
297    pub fn has_space<T>(&self) -> bool {
298        let h = self.head.load(Ordering::SeqCst);
299        let t = self.tail.load(Ordering::SeqCst);
300        !self.is_full(h, t)
301    }
302
303    #[inline]
304    fn get_next_ready<W: Fn(&AtomicU64, u64), T>(
305        &self,
306        wait: W,
307        flags: ReceiveFlags,
308        raw_buf: *const QueueEntry<T>,
309    ) -> Result<u64, QueueError> {
310        let mut attempts = 1000;
311        let t = loop {
312            let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
313            let b = self.bell.load(Ordering::SeqCst);
314            let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
315
316            if !self.is_empty(b, t) && self.is_turn(t, item) {
317                break t;
318            }
319
320            if flags.contains(ReceiveFlags::NON_BLOCK) {
321                return Err(QueueError::WouldBlock);
322            }
323
324            if attempts != 0 {
325                attempts -= 1;
326                core::hint::spin_loop();
327                continue;
328            }
329
330            self.consumer_set_waiting(true);
331            let b = self.bell.load(Ordering::SeqCst);
332            if self.is_empty(b, t) || !self.is_turn(t, item) {
333                wait(&self.bell, b);
334            }
335        };
336
337        if attempts == 0 {
338            self.consumer_set_waiting(false);
339        }
340        Ok(t)
341    }
342
343    fn setup_rec_sleep_simple(&self) -> (&AtomicU64, u64) {
344        // TODO: an interface that undoes this.
345        self.consumer_set_waiting(true);
346        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
347        (&self.bell, t)
348    }
349
350    fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
351        // TODO: an interface that undoes this.
352        self.submitter_waiting();
353        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
354        let h = self.head.load(Ordering::SeqCst) & 0x7fffffff;
355        if self.is_full(h, t) {
356            (&self.tail, t)
357        } else {
358            (&self.tail, u64::MAX)
359        }
360    }
361
362    fn setup_rec_sleep<'a, T>(
363        &'a self,
364        sleep: bool,
365        raw_buf: *const QueueEntry<T>,
366        waiter: &mut (Option<&'a AtomicU64>, u64),
367    ) -> Result<u64, QueueError> {
368        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
369        let b = self.bell.load(Ordering::SeqCst);
370        let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
371        *waiter = (Some(&self.bell), b);
372        if self.is_empty(b, t) || !self.is_turn(t, item) {
373            if sleep {
374                self.consumer_set_waiting(true);
375                let b = self.bell.load(Ordering::SeqCst);
376                *waiter = (Some(&self.bell), b);
377                if !self.is_empty(b, t) && self.is_turn(t, item) {
378                    return Ok(t);
379                }
380            }
381            Err(QueueError::WouldBlock)
382        } else {
383            Ok(t)
384        }
385    }
386
387    #[inline]
388    fn advance_tail<R: Fn(&AtomicU64)>(&self, ring: R) {
389        let t = self.tail.load(Ordering::SeqCst);
390        self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
391        if self.submitter_waiting() {
392            ring(&self.tail);
393        }
394    }
395
396    #[inline]
397    fn advance_tail_setup<'a>(&'a self, ringer: &mut Option<&'a AtomicU64>) {
398        let t = self.tail.load(Ordering::SeqCst);
399        self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
400        if self.submitter_waiting() {
401            *ringer = Some(&self.tail);
402        }
403    }
404}
405
406/// A raw queue, comprising of a header to track the algorithm and a buffer to hold queue entries.
407pub struct RawQueue<T> {
408    hdr: *const RawQueueHdr,
409    buf: UnsafeCell<*mut QueueEntry<T>>,
410}
411
412bitflags::bitflags! {
413    /// Flags to control how queue submission works.
414    pub struct SubmissionFlags: u32 {
415        /// If the request would block, return Err([SubmissionError::WouldBlock]) instead.
416        const NON_BLOCK = 1;
417    }
418
419    /// Flags to control how queue receive works.
420    pub struct ReceiveFlags: u32 {
421        /// If the request would block, return Err([ReceiveError::WouldBlock]) instead.
422        const NON_BLOCK = 1;
423    }
424}
425
426#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
427/// Possible errors for submitting to a queue.
428pub enum QueueError {
429    /// An unknown error.
430    Unknown,
431    /// The operation would have blocked, and non-blocking operation was specified.
432    WouldBlock,
433}
434
435impl Display for QueueError {
436    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
437        match self {
438            Self::Unknown => write!(f, "unknown"),
439            Self::WouldBlock => write!(f, "would block"),
440        }
441    }
442}
443
444impl core::error::Error for QueueError {}
445
446#[cfg(feature = "std")]
447impl From<QueueError> for std::io::Error {
448    fn from(err: QueueError) -> Self {
449        match err {
450            QueueError::WouldBlock => std::io::Error::from(std::io::ErrorKind::WouldBlock),
451            _ => std::io::Error::from(std::io::ErrorKind::Other),
452        }
453    }
454}
455
456impl<T: Copy> RawQueue<T> {
457    /// Construct a new raw queue out of a header reference and a buffer pointer.
458    /// # Safety
459    /// The caller must ensure that hdr and buf point to valid objects, and that the lifetime of the
460    /// RawQueue is exceeded by the objects pointed to.
461    pub unsafe fn new(hdr: *const RawQueueHdr, buf: *mut QueueEntry<T>) -> Self {
462        Self {
463            hdr,
464            buf: UnsafeCell::new(buf),
465        }
466    }
467
468    #[inline]
469    pub fn hdr(&self) -> &RawQueueHdr {
470        unsafe { &*self.hdr }
471    }
472
473    #[inline]
474    fn get_buf(&self, off: usize) -> *mut QueueEntry<T> {
475        unsafe { (*self.buf.get()).add(off & (self.hdr().len() - 1)) }
476    }
477
478    /// Submit a data item of type T, wrapped in a QueueEntry, to the queue. The two callbacks,
479    /// wait, and ring, are for implementing a rudimentary condvar, wherein if the queue needs to
480    /// block, we'll call wait(x, y), where we are supposed to wait until *x != y. Once we are done
481    /// inserting, if we need to wake up a consumer, we will call ring, which should wake up anyone
482    /// waiting on that word of memory.
483    pub fn submit<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
484        &self,
485        item: QueueEntry<T>,
486        wait: W,
487        ring: R,
488        flags: SubmissionFlags,
489    ) -> Result<(), QueueError> {
490        let h = self.hdr().reserve_slot(flags, wait)?;
491        let buf_item = self.get_buf(h as usize);
492
493        // Write these manually, to ensure we do not set cmd_slot until the end.
494        unsafe { *addr_of_mut!((*buf_item).data) = item.data };
495        unsafe { *addr_of_mut!((*buf_item).info) = item.info };
496        let turn = self.hdr().get_turn(h);
497        unsafe {
498            addr_of_mut!(*buf_item)
499                .as_ref()
500                .unwrap()
501                .set_cmd_slot(h | if turn { 1u32 << 31 } else { 0 })
502        };
503
504        self.hdr().ring(ring);
505        Ok(())
506    }
507
508    /// Receive data from the queue, returning either that data or an error. The wait and ring
509    /// callbacks work similar to [RawQueue::submit].
510    pub fn receive<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
511        &self,
512        wait: W,
513        ring: R,
514        flags: ReceiveFlags,
515    ) -> Result<QueueEntry<T>, QueueError> {
516        let t = self
517            .hdr()
518            .get_next_ready(wait, flags, unsafe { *self.buf.get() })?;
519        let buf_item = self.get_buf(t as usize);
520        let item = unsafe { buf_item.read() };
521        self.hdr().advance_tail(ring);
522        Ok(item)
523    }
524
525    pub fn has_pending(&self) -> bool {
526        self.hdr().has_pending(unsafe { *self.buf.get() })
527    }
528
529    pub fn has_space(&self) -> bool {
530        self.hdr().has_space::<T>()
531    }
532
533    pub fn setup_sleep<'a>(
534        &'a self,
535        sleep: bool,
536        output: &mut Option<QueueEntry<T>>,
537        waiter: &mut (Option<&'a AtomicU64>, u64),
538        ringer: &mut Option<&'a AtomicU64>,
539    ) -> Result<(), QueueError> {
540        let t = self
541            .hdr()
542            .setup_rec_sleep(sleep, unsafe { *self.buf.get() }, waiter)?;
543        let buf_item = self.get_buf(t as usize);
544        let item = unsafe { buf_item.read() };
545        *output = Some(item);
546        self.hdr().advance_tail_setup(ringer);
547        Ok(())
548    }
549
550    #[inline]
551    pub fn setup_sleep_simple(&self) -> (&AtomicU64, u64) {
552        self.hdr().setup_rec_sleep_simple()
553    }
554
555    #[inline]
556    pub fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
557        self.hdr().setup_send_sleep_simple()
558    }
559}
560
561unsafe impl<T: Send> Send for RawQueue<T> {}
562unsafe impl<T: Send> Sync for RawQueue<T> {}
563
564#[cfg(any(feature = "std", test))]
565/// Wait for receiving on multiple raw queues. If any of the passed raw queues can return data, they
566/// will do so by writing it into the output array at the same index that they are in the `queues`
567/// variable. The queues and output arrays must be the same length. If no data is available in any
568/// queues, then the function will call back on multi_wait, which it expects to wait until **any**
569/// of the pairs (&x, y) meet the condition that *x != y. Before returning any data, the function
570/// will callback on multi_ring, to inform multiple queues that data was taken from them. It expects
571/// the multi_ring function to wake up any waiting threads on the supplied words of memory.
572///
573/// Note that both call backs specify the pointers as Option. In the case that an entry is None,
574/// there was no requested wait or wake operation for that queue, and that entry should be ignored.
575///
576/// If flags specifies [ReceiveFlags::NON_BLOCK], then if no data is available, the function returns
577/// immediately with Err([QueueError::WouldBlock]).
578///
579/// # Rationale
580/// This function is here to implement poll or select like functionality, wherein a given thread or
581/// program wants to wait on multiple incoming request channels and handle them itself, thus cutting
582/// down on the number of threads required. The maximum number of queues to use here is a trade-off
583/// --- more means fewer threads, but since this function is linear in the number of queues, each
584/// thread could take longer to service requests.
585///
586/// The complexity of the multi_wait and multi_ring callbacks is present to avoid calling into the
587/// kernel often for high-contention queues.
588pub fn multi_receive<T: Copy, W: Fn(&[(Option<&AtomicU64>, u64)]), R: Fn(&[Option<&AtomicU64>])>(
589    queues: &[&RawQueue<T>],
590    output: &mut [Option<QueueEntry<T>>],
591    multi_wait: W,
592    multi_ring: R,
593    flags: ReceiveFlags,
594) -> Result<usize, QueueError> {
595    if output.len() != queues.len() {
596        return Err(QueueError::Unknown);
597    }
598    /* TODO (opt): avoid this allocation until we have to sleep */
599    let mut waiters = Vec::new();
600    waiters.resize(queues.len(), Default::default());
601    let mut ringers = Vec::new();
602    ringers.resize(queues.len(), None);
603    let mut attempts = 100;
604    loop {
605        let mut count = 0;
606        for (i, q) in queues.iter().enumerate() {
607            let res = q.setup_sleep(
608                attempts == 0,
609                &mut output[i],
610                &mut waiters[i],
611                &mut ringers[i],
612            );
613            if res == Ok(()) {
614                count += 1;
615            }
616        }
617        if count > 0 {
618            multi_ring(&ringers);
619            return Ok(count);
620        }
621        if flags.contains(ReceiveFlags::NON_BLOCK) {
622            return Err(QueueError::WouldBlock);
623        }
624        if attempts > 0 {
625            attempts -= 1;
626        } else {
627            multi_wait(&waiters);
628        }
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    #![allow(soft_unstable)]
635    use std::sync::atomic::{AtomicU64, Ordering};
636
637    //   use syscalls::SyscallArgs;
638    use crate::multi_receive;
639    use crate::{QueueEntry, QueueError, RawQueue, RawQueueHdr, ReceiveFlags, SubmissionFlags};
640
641    fn wait(x: &AtomicU64, v: u64) {
642        while x.load(Ordering::SeqCst) == v {
643            core::hint::spin_loop();
644        }
645    }
646
647    fn wake(_x: &AtomicU64) {
648        //   println!("wake");
649    }
650
651    #[test]
652    fn it_transmits() {
653        let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
654        let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
655        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
656
657        for i in 0..100 {
658            let res = q.submit(
659                QueueEntry::new(i as u32, i * 10),
660                wait,
661                wake,
662                SubmissionFlags::empty(),
663            );
664            assert_eq!(res, Ok(()));
665            let res = q.receive(wait, wake, ReceiveFlags::empty());
666            assert!(res.is_ok());
667            assert_eq!(res.unwrap().info(), i as u32);
668            assert_eq!(res.unwrap().item(), i * 10);
669        }
670    }
671
672    #[test]
673    fn it_fills() {
674        let qh = RawQueueHdr::new(2, std::mem::size_of::<QueueEntry<u32>>());
675        let mut buffer = [QueueEntry::<i32>::default(); 1 << 2];
676        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
677
678        let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
679        assert_eq!(res, Ok(()));
680        let res = q.submit(QueueEntry::new(2, 7), wait, wake, SubmissionFlags::empty());
681        assert_eq!(res, Ok(()));
682        let res = q.submit(QueueEntry::new(3, 7), wait, wake, SubmissionFlags::empty());
683        assert_eq!(res, Ok(()));
684        let res = q.submit(QueueEntry::new(4, 7), wait, wake, SubmissionFlags::empty());
685        assert_eq!(res, Ok(()));
686        let res = q.submit(
687            QueueEntry::new(1, 7),
688            wait,
689            wake,
690            SubmissionFlags::NON_BLOCK,
691        );
692        assert_eq!(res, Err(QueueError::WouldBlock));
693    }
694
695    #[test]
696    fn it_nonblock_receives() {
697        let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
698        let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
699        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
700
701        let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
702        assert_eq!(res, Ok(()));
703        let res = q.receive(wait, wake, ReceiveFlags::empty());
704        assert!(res.is_ok());
705        assert_eq!(res.unwrap().info(), 1);
706        assert_eq!(res.unwrap().item(), 7);
707        let res = q.receive(wait, wake, ReceiveFlags::NON_BLOCK);
708        assert_eq!(res.unwrap_err(), QueueError::WouldBlock);
709    }
710
711    #[test]
712    fn it_multi_receives() {
713        let qh1 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
714        let mut buffer1 = [QueueEntry::<i32>::default(); 1 << 4];
715        let q1 = unsafe { RawQueue::new(&qh1, buffer1.as_mut_ptr()) };
716
717        let qh2 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
718        let mut buffer2 = [QueueEntry::<i32>::default(); 1 << 4];
719        let q2 = unsafe { RawQueue::new(&qh2, buffer2.as_mut_ptr()) };
720
721        let res = q1.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
722        assert_eq!(res, Ok(()));
723        let res = q2.submit(QueueEntry::new(2, 8), wait, wake, SubmissionFlags::empty());
724        assert_eq!(res, Ok(()));
725
726        let mut output = [None, None];
727        let res = multi_receive(
728            &[&q1, &q2],
729            &mut output,
730            |_| {},
731            |_| {},
732            ReceiveFlags::empty(),
733        );
734        assert_eq!(res, Ok(2));
735        assert_eq!(output[0].unwrap().info(), 1);
736        assert_eq!(output[0].unwrap().item(), 7);
737        assert_eq!(output[1].unwrap().info(), 2);
738        assert_eq!(output[1].unwrap().item(), 8);
739    }
740
741    /*
742        #[cfg(not(target_os = "twizzler"))]
743        extern crate crossbeam;
744        #[cfg(not(target_os = "twizzler"))]
745        extern crate test;
746        #[cfg(not(target_os = "twizzler"))]
747        #[bench]
748        fn two_threads(b: &mut test::Bencher) -> impl Termination {
749            let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
750            let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
751            let q = unsafe {
752                RawQueue::new(
753                    std::mem::transmute::<&RawQueueHdr, &'static RawQueueHdr>(&qh),
754                    buffer.as_mut_ptr(),
755                )
756            };
757
758            //let count = AtomicU64::new(0);
759            let x = crossbeam::scope(|s| {
760                s.spawn(|_| loop {
761                    let res = q.receive(wait, wake, ReceiveFlags::empty());
762                    assert!(res.is_ok());
763                    if res.unwrap().info() == 2 {
764                        break;
765                    }
766                    //count.fetch_add(1, Ordering::SeqCst);
767                });
768
769                b.iter(|| {
770                    let res = q.submit(QueueEntry::new(1, 2), wait, wake, SubmissionFlags::empty());
771                    assert_eq!(res, Ok(()));
772                });
773                let res = q.submit(QueueEntry::new(2, 2), wait, wake, SubmissionFlags::empty());
774                assert_eq!(res, Ok(()));
775            });
776
777            x.unwrap();
778        }
779    */
780}