twizzler_queue_raw/
lib.rs

1//! A raw queue interface for Twizzler, making no assumptions about where the underlying headers and
2//! circular buffers are located. This means you probably don't want to use this --- instead, I
3//! suggest you use the wrapped version of this library, twizzler-queue, since that actually
4//! interacts with the object system.
5//!
6//! This library exists to provide an underlying implementation of the concurrent data structure for
7//! each individual raw queue so that this complex code can be reused in both userspace and the
8//! kernel.
9//!
10//! The basic design of a raw queue is two parts:
11//!
12//!   1. A header, which contains things like head pointers, tail pointers, etc.
13//!   2. A buffer, which contains the items that are enqueued.
14//!
15//! The queue is an MPSC lock-free blocking data structure. Any thread may submit to a queue, but
16//! only one thread may receive on that queue at a time. The queue is implemented with a head
17//! pointer, a tail pointer, a doorbell, and a waiters counter. Additionally, the queue is
18//! maintained in terms of "turns", that indicate which "go around" of the queue we are on (mod 2).
19//!
20//! # Let's look at an insert
21//! Here's what the queue looks like to start with. The 0_ indicates that it's empty, and turn is
22//! set to 0.
23//! ```text
24//!  b
25//!  t
26//!  h
27//! [0_, 0_, 0_]
28//! ```
29//! When inserting, the thread first reserves space:
30//! ```text
31//!  b
32//!  t
33//!      h
34//! [0_, 0_, 0_]
35//! ```
36//! Then it fills out the data:
37//! ```text
38//!  b
39//!  t
40//!      h
41//! [0X, 0_, 0_]
42//! ```
43//! Then it toggles the turn bit:
44//! ```text
45//!  b
46//!  t
47//!      h
48//! [1X, 0_, 0_]
49//! ```
50//! Next, it bumps the doorbell (and maybe wakes up a waiting consumer):
51//! ```text
52//!      b
53//!  t
54//!      h
55//! [1X, 0_, 0_]
56//! ```
57//!
58//! Now, let's say the consumer comes along and dequeues. First, it checks if it's empty by
59//! comparing tail and bell, and finds it's not empty. Then it checks if it's the correct turn. This
60//! turn is 1, so yes. Next, it remove the data from the queue:
61//! ```text
62//!      b
63//!  t
64//!      h
65//! [1_, 0_, 0_]
66//! ```
67//! And then finally it increments the tail counter:
68//! ```text
69//!      b
70//!      t
71//!      h
72//! [1_, 0_, 0_]
73//! ```
74
75#![cfg_attr(test, feature(test))]
76#![cfg_attr(not(any(feature = "std", test)), no_std)]
77
78use core::{
79    cell::UnsafeCell,
80    fmt::Display,
81    marker::PhantomData,
82    ptr::addr_of_mut,
83    sync::atomic::{AtomicU32, AtomicU64, Ordering},
84};
85
86#[derive(Clone, Copy, Default, Debug)]
87#[repr(C)]
88/// A queue entry. All queues must be formed of these, as the queue algorithm uses data inside this
89/// struct as part of its operation. The cmd_slot is used internally to track turn, and the info is
90/// used by the full queue structure to manage completion. The data T is user data passed around the
91/// queue.
92pub struct QueueEntry<T> {
93    cmd_slot: u32,
94    info: u32,
95    data: T,
96}
97
98impl<T> QueueEntry<T> {
99    #[inline]
100    fn get_cmd_slot(&self) -> u32 {
101        unsafe { core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).load(Ordering::SeqCst) }
102    }
103
104    #[inline]
105    fn set_cmd_slot(&self, v: u32) {
106        unsafe {
107            core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).store(v, Ordering::SeqCst);
108        }
109    }
110
111    #[inline]
112    /// Get the data item of a QueueEntry.
113    pub fn item(self) -> T {
114        self.data
115    }
116
117    #[inline]
118    /// Get the info tag of a QueueEntry.
119    pub fn info(&self) -> u32 {
120        self.info
121    }
122
123    /// Construct a new QueueEntry. The `info` tag should be used to inform completion events in the
124    /// full queue.
125    pub fn new(info: u32, item: T) -> Self {
126        Self {
127            cmd_slot: 0,
128            info,
129            data: item,
130        }
131    }
132}
133
134/// The base info structure stored in a Twizzler queue object. Used to open Twizzler queue objects
135/// and create a Queue.
136#[repr(C)]
137pub struct QueueBase<S, C> {
138    pub sub_hdr: usize,
139    pub com_hdr: usize,
140    pub sub_buf: usize,
141    pub com_buf: usize,
142    _pd: PhantomData<(S, C)>,
143}
144
145#[repr(C)]
146/// A raw queue header. This contains all the necessary counters and info to run the queue
147/// algorithm.
148pub struct RawQueueHdr {
149    l2len: usize,
150    stride: usize,
151    head: AtomicU32,
152    waiters: AtomicU32,
153    bell: AtomicU64,
154    tail: AtomicU64,
155}
156
157impl RawQueueHdr {
158    /// Construct a new raw queue header.
159    pub fn new(l2len: usize, stride: usize) -> Self {
160        Self {
161            l2len,
162            stride,
163            head: AtomicU32::new(0),
164            waiters: AtomicU32::new(0),
165            bell: AtomicU64::new(0),
166            tail: AtomicU64::new(0),
167        }
168    }
169
170    #[inline]
171    fn len(&self) -> usize {
172        1 << self.l2len
173    }
174
175    #[inline]
176    fn is_full(&self, h: u32, t: u64) -> bool {
177        (h & 0x7fffffff) as u64 - (t & 0x7fffffff) >= self.len() as u64
178    }
179
180    #[inline]
181    fn is_empty(&self, bell: u64, tail: u64) -> bool {
182        (bell & 0x7fffffff) == (tail & 0x7fffffff)
183    }
184
185    #[inline]
186    fn is_turn<T>(&self, t: u64, item: *const QueueEntry<T>) -> bool {
187        let turn = (t / (self.len() as u64)) % 2;
188        let val = unsafe { &*item }.get_cmd_slot() >> 31;
189        (val == 0) == (turn == 1)
190    }
191
192    #[inline]
193    fn consumer_waiting(&self) -> bool {
194        (self.tail.load(Ordering::SeqCst) & (1 << 31)) != 0
195    }
196
197    #[inline]
198    fn submitter_waiting(&self) -> bool {
199        self.waiters.load(Ordering::SeqCst) > 0
200    }
201
202    #[inline]
203    fn consumer_set_waiting(&self, waiting: bool) {
204        if waiting {
205            self.tail.fetch_or(1 << 31, Ordering::SeqCst);
206        } else {
207            self.tail.fetch_and(!(1 << 31), Ordering::SeqCst);
208        }
209    }
210
211    #[inline]
212    fn inc_submit_waiting(&self) {
213        self.waiters.fetch_add(1, Ordering::SeqCst);
214    }
215
216    #[inline]
217    fn dec_submit_waiting(&self) {
218        self.waiters.fetch_sub(1, Ordering::SeqCst);
219    }
220
221    #[inline]
222    fn reserve_slot<W: Fn(&AtomicU64, u64)>(
223        &self,
224        flags: SubmissionFlags,
225        wait: W,
226    ) -> Result<u32, QueueError> {
227        let mut waiter = false;
228        let mut attempts = 1000;
229        let h = loop {
230            let h = self.head.load(Ordering::SeqCst);
231            let t = self.tail.load(Ordering::SeqCst);
232            if !self.is_full(h, t) {
233                if self
234                    .head
235                    .compare_exchange(h, h + 1, Ordering::SeqCst, Ordering::SeqCst)
236                    .is_ok()
237                {
238                    break h;
239                } else {
240                    core::hint::spin_loop();
241                    continue;
242                }
243            }
244
245            if flags.contains(SubmissionFlags::NON_BLOCK) {
246                return Err(QueueError::WouldBlock);
247            }
248
249            if attempts != 0 {
250                attempts -= 1;
251                core::hint::spin_loop();
252                continue;
253            }
254
255            if !waiter {
256                waiter = true;
257                self.inc_submit_waiting();
258            }
259
260            let t = self.tail.load(Ordering::SeqCst);
261            if self.is_full(h, t) {
262                wait(&self.tail, t);
263            }
264        };
265
266        if waiter {
267            self.dec_submit_waiting();
268        }
269
270        Ok(h & 0x7fffffff)
271    }
272
273    #[inline]
274    fn get_turn(&self, h: u32) -> bool {
275        (h / self.len() as u32) % 2 == 0
276    }
277
278    #[inline]
279    fn ring<R: Fn(&AtomicU64)>(&self, ring: R) {
280        self.bell.fetch_add(1, Ordering::SeqCst);
281        if self.consumer_waiting() {
282            ring(&self.bell)
283        }
284    }
285
286    #[inline]
287    fn get_next_ready<W: Fn(&AtomicU64, u64), T>(
288        &self,
289        wait: W,
290        flags: ReceiveFlags,
291        raw_buf: *const QueueEntry<T>,
292    ) -> Result<u64, QueueError> {
293        let mut attempts = 1000;
294        let t = loop {
295            let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
296            let b = self.bell.load(Ordering::SeqCst);
297            let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
298
299            if !self.is_empty(b, t) && self.is_turn(t, item) {
300                break t;
301            }
302
303            if flags.contains(ReceiveFlags::NON_BLOCK) {
304                return Err(QueueError::WouldBlock);
305            }
306
307            if attempts != 0 {
308                attempts -= 1;
309                core::hint::spin_loop();
310                continue;
311            }
312
313            self.consumer_set_waiting(true);
314            let b = self.bell.load(Ordering::SeqCst);
315            if self.is_empty(b, t) || !self.is_turn(t, item) {
316                wait(&self.bell, b);
317            }
318        };
319
320        if attempts == 0 {
321            self.consumer_set_waiting(false);
322        }
323        Ok(t)
324    }
325
326    fn setup_rec_sleep_simple(&self) -> (&AtomicU64, u64) {
327        // TODO: an interface that undoes this.
328        self.consumer_set_waiting(true);
329        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
330        (&self.bell, t)
331    }
332
333    fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
334        // TODO: an interface that undoes this.
335        self.submitter_waiting();
336        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
337        let h = self.head.load(Ordering::SeqCst) & 0x7fffffff;
338        if self.is_full(h, t) {
339            (&self.tail, t)
340        } else {
341            (&self.tail, u64::MAX)
342        }
343    }
344
345    fn setup_rec_sleep<'a, T>(
346        &'a self,
347        sleep: bool,
348        raw_buf: *const QueueEntry<T>,
349        waiter: &mut (Option<&'a AtomicU64>, u64),
350    ) -> Result<u64, QueueError> {
351        let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
352        let b = self.bell.load(Ordering::SeqCst);
353        let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
354        *waiter = (Some(&self.bell), b);
355        if self.is_empty(b, t) || !self.is_turn(t, item) {
356            if sleep {
357                self.consumer_set_waiting(true);
358                let b = self.bell.load(Ordering::SeqCst);
359                *waiter = (Some(&self.bell), b);
360                if !self.is_empty(b, t) && self.is_turn(t, item) {
361                    return Ok(t);
362                }
363            }
364            Err(QueueError::WouldBlock)
365        } else {
366            Ok(t)
367        }
368    }
369
370    #[inline]
371    fn advance_tail<R: Fn(&AtomicU64)>(&self, ring: R) {
372        let t = self.tail.load(Ordering::SeqCst);
373        self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
374        if self.submitter_waiting() {
375            ring(&self.tail);
376        }
377    }
378
379    #[inline]
380    fn advance_tail_setup<'a>(&'a self, ringer: &mut Option<&'a AtomicU64>) {
381        let t = self.tail.load(Ordering::SeqCst);
382        self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
383        if self.submitter_waiting() {
384            *ringer = Some(&self.tail);
385        }
386    }
387}
388
389/// A raw queue, comprising of a header to track the algorithm and a buffer to hold queue entries.
390pub struct RawQueue<T> {
391    hdr: *const RawQueueHdr,
392    buf: UnsafeCell<*mut QueueEntry<T>>,
393}
394
395bitflags::bitflags! {
396    /// Flags to control how queue submission works.
397    pub struct SubmissionFlags: u32 {
398        /// If the request would block, return Err([SubmissionError::WouldBlock]) instead.
399        const NON_BLOCK = 1;
400    }
401
402    /// Flags to control how queue receive works.
403    pub struct ReceiveFlags: u32 {
404        /// If the request would block, return Err([ReceiveError::WouldBlock]) instead.
405        const NON_BLOCK = 1;
406    }
407}
408
409#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
410/// Possible errors for submitting to a queue.
411pub enum QueueError {
412    /// An unknown error.
413    Unknown,
414    /// The operation would have blocked, and non-blocking operation was specified.
415    WouldBlock,
416}
417
418impl Display for QueueError {
419    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
420        match self {
421            Self::Unknown => write!(f, "unknown"),
422            Self::WouldBlock => write!(f, "would block"),
423        }
424    }
425}
426
427impl core::error::Error for QueueError {}
428
429#[cfg(feature = "std")]
430impl From<QueueError> for std::io::Error {
431    fn from(err: QueueError) -> Self {
432        match err {
433            QueueError::WouldBlock => std::io::Error::from(std::io::ErrorKind::WouldBlock),
434            _ => std::io::Error::from(std::io::ErrorKind::Other),
435        }
436    }
437}
438
439impl<T: Copy> RawQueue<T> {
440    /// Construct a new raw queue out of a header reference and a buffer pointer.
441    /// # Safety
442    /// The caller must ensure that hdr and buf point to valid objects, and that the lifetime of the
443    /// RawQueue is exceeded by the objects pointed to.
444    pub unsafe fn new(hdr: *const RawQueueHdr, buf: *mut QueueEntry<T>) -> Self {
445        Self {
446            hdr,
447            buf: UnsafeCell::new(buf),
448        }
449    }
450
451    #[inline]
452    fn hdr(&self) -> &RawQueueHdr {
453        unsafe { &*self.hdr }
454    }
455
456    #[inline]
457    fn get_buf(&self, off: usize) -> *mut QueueEntry<T> {
458        unsafe { (*self.buf.get()).add(off & (self.hdr().len() - 1)) }
459    }
460
461    /// Submit a data item of type T, wrapped in a QueueEntry, to the queue. The two callbacks,
462    /// wait, and ring, are for implementing a rudimentary condvar, wherein if the queue needs to
463    /// block, we'll call wait(x, y), where we are supposed to wait until *x != y. Once we are done
464    /// inserting, if we need to wake up a consumer, we will call ring, which should wake up anyone
465    /// waiting on that word of memory.
466    pub fn submit<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
467        &self,
468        item: QueueEntry<T>,
469        wait: W,
470        ring: R,
471        flags: SubmissionFlags,
472    ) -> Result<(), QueueError> {
473        let h = self.hdr().reserve_slot(flags, wait)?;
474        let buf_item = self.get_buf(h as usize);
475
476        // Write these manually, to ensure we do not set cmd_slot until the end.
477        unsafe { *addr_of_mut!((*buf_item).data) = item.data };
478        unsafe { *addr_of_mut!((*buf_item).info) = item.info };
479        let turn = self.hdr().get_turn(h);
480        unsafe {
481            addr_of_mut!(*buf_item)
482                .as_ref()
483                .unwrap()
484                .set_cmd_slot(h | if turn { 1u32 << 31 } else { 0 })
485        };
486
487        self.hdr().ring(ring);
488        Ok(())
489    }
490
491    /// Receive data from the queue, returning either that data or an error. The wait and ring
492    /// callbacks work similar to [RawQueue::submit].
493    pub fn receive<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
494        &self,
495        wait: W,
496        ring: R,
497        flags: ReceiveFlags,
498    ) -> Result<QueueEntry<T>, QueueError> {
499        let t = self
500            .hdr()
501            .get_next_ready(wait, flags, unsafe { *self.buf.get() })?;
502        let buf_item = self.get_buf(t as usize);
503        let item = unsafe { buf_item.read() };
504        self.hdr().advance_tail(ring);
505        Ok(item)
506    }
507
508    pub fn setup_sleep<'a>(
509        &'a self,
510        sleep: bool,
511        output: &mut Option<QueueEntry<T>>,
512        waiter: &mut (Option<&'a AtomicU64>, u64),
513        ringer: &mut Option<&'a AtomicU64>,
514    ) -> Result<(), QueueError> {
515        let t = self
516            .hdr()
517            .setup_rec_sleep(sleep, unsafe { *self.buf.get() }, waiter)?;
518        let buf_item = self.get_buf(t as usize);
519        let item = unsafe { buf_item.read() };
520        *output = Some(item);
521        self.hdr().advance_tail_setup(ringer);
522        Ok(())
523    }
524
525    #[inline]
526    pub fn setup_sleep_simple(&self) -> (&AtomicU64, u64) {
527        self.hdr().setup_rec_sleep_simple()
528    }
529
530    #[inline]
531    pub fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
532        self.hdr().setup_send_sleep_simple()
533    }
534}
535
536unsafe impl<T: Send> Send for RawQueue<T> {}
537unsafe impl<T: Send> Sync for RawQueue<T> {}
538
539#[cfg(any(feature = "std", test))]
540/// Wait for receiving on multiple raw queues. If any of the passed raw queues can return data, they
541/// will do so by writing it into the output array at the same index that they are in the `queues`
542/// variable. The queues and output arrays must be the same length. If no data is available in any
543/// queues, then the function will call back on multi_wait, which it expects to wait until **any**
544/// of the pairs (&x, y) meet the condition that *x != y. Before returning any data, the function
545/// will callback on multi_ring, to inform multiple queues that data was taken from them. It expects
546/// the multi_ring function to wake up any waiting threads on the supplied words of memory.
547///
548/// Note that both call backs specify the pointers as Option. In the case that an entry is None,
549/// there was no requested wait or wake operation for that queue, and that entry should be ignored.
550///
551/// If flags specifies [ReceiveFlags::NON_BLOCK], then if no data is available, the function returns
552/// immediately with Err([QueueError::WouldBlock]).
553///
554/// # Rationale
555/// This function is here to implement poll or select like functionality, wherein a given thread or
556/// program wants to wait on multiple incoming request channels and handle them itself, thus cutting
557/// down on the number of threads required. The maximum number of queues to use here is a trade-off
558/// --- more means fewer threads, but since this function is linear in the number of queues, each
559/// thread could take longer to service requests.
560///
561/// The complexity of the multi_wait and multi_ring callbacks is present to avoid calling into the
562/// kernel often for high-contention queues.
563pub fn multi_receive<T: Copy, W: Fn(&[(Option<&AtomicU64>, u64)]), R: Fn(&[Option<&AtomicU64>])>(
564    queues: &[&RawQueue<T>],
565    output: &mut [Option<QueueEntry<T>>],
566    multi_wait: W,
567    multi_ring: R,
568    flags: ReceiveFlags,
569) -> Result<usize, QueueError> {
570    if output.len() != queues.len() {
571        return Err(QueueError::Unknown);
572    }
573    /* TODO (opt): avoid this allocation until we have to sleep */
574    let mut waiters = Vec::new();
575    waiters.resize(queues.len(), Default::default());
576    let mut ringers = Vec::new();
577    ringers.resize(queues.len(), None);
578    let mut attempts = 100;
579    loop {
580        let mut count = 0;
581        for (i, q) in queues.iter().enumerate() {
582            let res = q.setup_sleep(
583                attempts == 0,
584                &mut output[i],
585                &mut waiters[i],
586                &mut ringers[i],
587            );
588            if res == Ok(()) {
589                count += 1;
590            }
591        }
592        if count > 0 {
593            multi_ring(&ringers);
594            return Ok(count);
595        }
596        if flags.contains(ReceiveFlags::NON_BLOCK) {
597            return Err(QueueError::WouldBlock);
598        }
599        if attempts > 0 {
600            attempts -= 1;
601        } else {
602            multi_wait(&waiters);
603        }
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    #![allow(soft_unstable)]
610    use std::sync::atomic::{AtomicU64, Ordering};
611
612    //   use syscalls::SyscallArgs;
613    use crate::multi_receive;
614    use crate::{QueueEntry, QueueError, RawQueue, RawQueueHdr, ReceiveFlags, SubmissionFlags};
615
616    fn wait(x: &AtomicU64, v: u64) {
617        while x.load(Ordering::SeqCst) == v {
618            core::hint::spin_loop();
619        }
620    }
621
622    fn wake(_x: &AtomicU64) {
623        //   println!("wake");
624    }
625
626    #[test]
627    fn it_transmits() {
628        let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
629        let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
630        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
631
632        for i in 0..100 {
633            let res = q.submit(
634                QueueEntry::new(i as u32, i * 10),
635                wait,
636                wake,
637                SubmissionFlags::empty(),
638            );
639            assert_eq!(res, Ok(()));
640            let res = q.receive(wait, wake, ReceiveFlags::empty());
641            assert!(res.is_ok());
642            assert_eq!(res.unwrap().info(), i as u32);
643            assert_eq!(res.unwrap().item(), i * 10);
644        }
645    }
646
647    #[test]
648    fn it_fills() {
649        let qh = RawQueueHdr::new(2, std::mem::size_of::<QueueEntry<u32>>());
650        let mut buffer = [QueueEntry::<i32>::default(); 1 << 2];
651        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
652
653        let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
654        assert_eq!(res, Ok(()));
655        let res = q.submit(QueueEntry::new(2, 7), wait, wake, SubmissionFlags::empty());
656        assert_eq!(res, Ok(()));
657        let res = q.submit(QueueEntry::new(3, 7), wait, wake, SubmissionFlags::empty());
658        assert_eq!(res, Ok(()));
659        let res = q.submit(QueueEntry::new(4, 7), wait, wake, SubmissionFlags::empty());
660        assert_eq!(res, Ok(()));
661        let res = q.submit(
662            QueueEntry::new(1, 7),
663            wait,
664            wake,
665            SubmissionFlags::NON_BLOCK,
666        );
667        assert_eq!(res, Err(QueueError::WouldBlock));
668    }
669
670    #[test]
671    fn it_nonblock_receives() {
672        let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
673        let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
674        let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
675
676        let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
677        assert_eq!(res, Ok(()));
678        let res = q.receive(wait, wake, ReceiveFlags::empty());
679        assert!(res.is_ok());
680        assert_eq!(res.unwrap().info(), 1);
681        assert_eq!(res.unwrap().item(), 7);
682        let res = q.receive(wait, wake, ReceiveFlags::NON_BLOCK);
683        assert_eq!(res.unwrap_err(), QueueError::WouldBlock);
684    }
685
686    #[test]
687    fn it_multi_receives() {
688        let qh1 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
689        let mut buffer1 = [QueueEntry::<i32>::default(); 1 << 4];
690        let q1 = unsafe { RawQueue::new(&qh1, buffer1.as_mut_ptr()) };
691
692        let qh2 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
693        let mut buffer2 = [QueueEntry::<i32>::default(); 1 << 4];
694        let q2 = unsafe { RawQueue::new(&qh2, buffer2.as_mut_ptr()) };
695
696        let res = q1.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
697        assert_eq!(res, Ok(()));
698        let res = q2.submit(QueueEntry::new(2, 8), wait, wake, SubmissionFlags::empty());
699        assert_eq!(res, Ok(()));
700
701        let mut output = [None, None];
702        let res = multi_receive(
703            &[&q1, &q2],
704            &mut output,
705            |_| {},
706            |_| {},
707            ReceiveFlags::empty(),
708        );
709        assert_eq!(res, Ok(2));
710        assert_eq!(output[0].unwrap().info(), 1);
711        assert_eq!(output[0].unwrap().item(), 7);
712        assert_eq!(output[1].unwrap().info(), 2);
713        assert_eq!(output[1].unwrap().item(), 8);
714    }
715
716    /*
717        #[cfg(not(target_os = "twizzler"))]
718        extern crate crossbeam;
719        #[cfg(not(target_os = "twizzler"))]
720        extern crate test;
721        #[cfg(not(target_os = "twizzler"))]
722        #[bench]
723        fn two_threads(b: &mut test::Bencher) -> impl Termination {
724            let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
725            let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
726            let q = unsafe {
727                RawQueue::new(
728                    std::mem::transmute::<&RawQueueHdr, &'static RawQueueHdr>(&qh),
729                    buffer.as_mut_ptr(),
730                )
731            };
732
733            //let count = AtomicU64::new(0);
734            let x = crossbeam::scope(|s| {
735                s.spawn(|_| loop {
736                    let res = q.receive(wait, wake, ReceiveFlags::empty());
737                    assert!(res.is_ok());
738                    if res.unwrap().info() == 2 {
739                        break;
740                    }
741                    //count.fetch_add(1, Ordering::SeqCst);
742                });
743
744                b.iter(|| {
745                    let res = q.submit(QueueEntry::new(1, 2), wait, wake, SubmissionFlags::empty());
746                    assert_eq!(res, Ok(()));
747                });
748                let res = q.submit(QueueEntry::new(2, 2), wait, wake, SubmissionFlags::empty());
749                assert_eq!(res, Ok(()));
750            });
751
752            x.unwrap();
753        }
754    */
755}