1#![cfg_attr(test, feature(test))]
76#![cfg_attr(not(any(feature = "std", test)), no_std)]
77
78use core::{
79 cell::UnsafeCell,
80 fmt::Display,
81 marker::PhantomData,
82 ptr::addr_of_mut,
83 sync::atomic::{AtomicU32, AtomicU64, Ordering},
84};
85
86#[derive(Clone, Copy, Default, Debug)]
87#[repr(C)]
88pub struct QueueEntry<T> {
93 cmd_slot: u32,
94 info: u32,
95 data: T,
96}
97
98impl<T> QueueEntry<T> {
99 #[inline]
100 fn get_cmd_slot(&self) -> u32 {
101 unsafe { core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).load(Ordering::SeqCst) }
102 }
103
104 #[inline]
105 fn set_cmd_slot(&self, v: u32) {
106 unsafe {
107 core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).store(v, Ordering::SeqCst);
108 }
109 }
110
111 #[inline]
112 pub fn item(self) -> T {
114 self.data
115 }
116
117 #[inline]
118 pub fn info(&self) -> u32 {
120 self.info
121 }
122
123 pub fn new(info: u32, item: T) -> Self {
126 Self {
127 cmd_slot: 0,
128 info,
129 data: item,
130 }
131 }
132}
133
134#[repr(C)]
137pub struct QueueBase<S, C> {
138 pub sub_hdr: usize,
139 pub com_hdr: usize,
140 pub sub_buf: usize,
141 pub com_buf: usize,
142 _pd: PhantomData<(S, C)>,
143}
144
145#[repr(C)]
146pub struct RawQueueHdr {
149 l2len: usize,
150 stride: usize,
151 head: AtomicU32,
152 waiters: AtomicU32,
153 bell: AtomicU64,
154 tail: AtomicU64,
155}
156
157impl RawQueueHdr {
158 pub fn new(l2len: usize, stride: usize) -> Self {
160 Self {
161 l2len,
162 stride,
163 head: AtomicU32::new(0),
164 waiters: AtomicU32::new(0),
165 bell: AtomicU64::new(0),
166 tail: AtomicU64::new(0),
167 }
168 }
169
170 #[inline]
171 fn len(&self) -> usize {
172 1 << self.l2len
173 }
174
175 #[inline]
176 fn is_full(&self, h: u32, t: u64) -> bool {
177 (h & 0x7fffffff) as u64 - (t & 0x7fffffff) >= self.len() as u64
178 }
179
180 #[inline]
181 fn is_empty(&self, bell: u64, tail: u64) -> bool {
182 (bell & 0x7fffffff) == (tail & 0x7fffffff)
183 }
184
185 #[inline]
186 fn is_turn<T>(&self, t: u64, item: *const QueueEntry<T>) -> bool {
187 let turn = (t / (self.len() as u64)) % 2;
188 let val = unsafe { &*item }.get_cmd_slot() >> 31;
189 (val == 0) == (turn == 1)
190 }
191
192 #[inline]
193 fn consumer_waiting(&self) -> bool {
194 (self.tail.load(Ordering::SeqCst) & (1 << 31)) != 0
195 }
196
197 #[inline]
198 fn submitter_waiting(&self) -> bool {
199 self.waiters.load(Ordering::SeqCst) > 0
200 }
201
202 #[inline]
203 fn consumer_set_waiting(&self, waiting: bool) {
204 if waiting {
205 self.tail.fetch_or(1 << 31, Ordering::SeqCst);
206 } else {
207 self.tail.fetch_and(!(1 << 31), Ordering::SeqCst);
208 }
209 }
210
211 #[inline]
212 fn inc_submit_waiting(&self) {
213 self.waiters.fetch_add(1, Ordering::SeqCst);
214 }
215
216 #[inline]
217 fn dec_submit_waiting(&self) {
218 self.waiters.fetch_sub(1, Ordering::SeqCst);
219 }
220
221 #[inline]
222 fn reserve_slot<W: Fn(&AtomicU64, u64)>(
223 &self,
224 flags: SubmissionFlags,
225 wait: W,
226 ) -> Result<u32, QueueError> {
227 let mut waiter = false;
228 let mut attempts = 1000;
229 let h = loop {
230 let h = self.head.load(Ordering::SeqCst);
231 let t = self.tail.load(Ordering::SeqCst);
232 if !self.is_full(h, t) {
233 if self
234 .head
235 .compare_exchange(h, h + 1, Ordering::SeqCst, Ordering::SeqCst)
236 .is_ok()
237 {
238 break h;
239 } else {
240 core::hint::spin_loop();
241 continue;
242 }
243 }
244
245 if flags.contains(SubmissionFlags::NON_BLOCK) {
246 return Err(QueueError::WouldBlock);
247 }
248
249 if attempts != 0 {
250 attempts -= 1;
251 core::hint::spin_loop();
252 continue;
253 }
254
255 if !waiter {
256 waiter = true;
257 self.inc_submit_waiting();
258 }
259
260 let t = self.tail.load(Ordering::SeqCst);
261 if self.is_full(h, t) {
262 wait(&self.tail, t);
263 }
264 };
265
266 if waiter {
267 self.dec_submit_waiting();
268 }
269
270 Ok(h & 0x7fffffff)
271 }
272
273 #[inline]
274 fn get_turn(&self, h: u32) -> bool {
275 (h / self.len() as u32) % 2 == 0
276 }
277
278 #[inline]
279 fn ring<R: Fn(&AtomicU64)>(&self, ring: R) {
280 self.bell.fetch_add(1, Ordering::SeqCst);
281 if self.consumer_waiting() {
282 ring(&self.bell)
283 }
284 }
285
286 #[inline]
287 fn get_next_ready<W: Fn(&AtomicU64, u64), T>(
288 &self,
289 wait: W,
290 flags: ReceiveFlags,
291 raw_buf: *const QueueEntry<T>,
292 ) -> Result<u64, QueueError> {
293 let mut attempts = 1000;
294 let t = loop {
295 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
296 let b = self.bell.load(Ordering::SeqCst);
297 let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
298
299 if !self.is_empty(b, t) && self.is_turn(t, item) {
300 break t;
301 }
302
303 if flags.contains(ReceiveFlags::NON_BLOCK) {
304 return Err(QueueError::WouldBlock);
305 }
306
307 if attempts != 0 {
308 attempts -= 1;
309 core::hint::spin_loop();
310 continue;
311 }
312
313 self.consumer_set_waiting(true);
314 let b = self.bell.load(Ordering::SeqCst);
315 if self.is_empty(b, t) || !self.is_turn(t, item) {
316 wait(&self.bell, b);
317 }
318 };
319
320 if attempts == 0 {
321 self.consumer_set_waiting(false);
322 }
323 Ok(t)
324 }
325
326 fn setup_rec_sleep_simple(&self) -> (&AtomicU64, u64) {
327 self.consumer_set_waiting(true);
329 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
330 (&self.bell, t)
331 }
332
333 fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
334 self.submitter_waiting();
336 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
337 let h = self.head.load(Ordering::SeqCst) & 0x7fffffff;
338 if self.is_full(h, t) {
339 (&self.tail, t)
340 } else {
341 (&self.tail, u64::MAX)
342 }
343 }
344
345 fn setup_rec_sleep<'a, T>(
346 &'a self,
347 sleep: bool,
348 raw_buf: *const QueueEntry<T>,
349 waiter: &mut (Option<&'a AtomicU64>, u64),
350 ) -> Result<u64, QueueError> {
351 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
352 let b = self.bell.load(Ordering::SeqCst);
353 let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
354 *waiter = (Some(&self.bell), b);
355 if self.is_empty(b, t) || !self.is_turn(t, item) {
356 if sleep {
357 self.consumer_set_waiting(true);
358 let b = self.bell.load(Ordering::SeqCst);
359 *waiter = (Some(&self.bell), b);
360 if !self.is_empty(b, t) && self.is_turn(t, item) {
361 return Ok(t);
362 }
363 }
364 Err(QueueError::WouldBlock)
365 } else {
366 Ok(t)
367 }
368 }
369
370 #[inline]
371 fn advance_tail<R: Fn(&AtomicU64)>(&self, ring: R) {
372 let t = self.tail.load(Ordering::SeqCst);
373 self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
374 if self.submitter_waiting() {
375 ring(&self.tail);
376 }
377 }
378
379 #[inline]
380 fn advance_tail_setup<'a>(&'a self, ringer: &mut Option<&'a AtomicU64>) {
381 let t = self.tail.load(Ordering::SeqCst);
382 self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
383 if self.submitter_waiting() {
384 *ringer = Some(&self.tail);
385 }
386 }
387}
388
389pub struct RawQueue<T> {
391 hdr: *const RawQueueHdr,
392 buf: UnsafeCell<*mut QueueEntry<T>>,
393}
394
395bitflags::bitflags! {
396 pub struct SubmissionFlags: u32 {
398 const NON_BLOCK = 1;
400 }
401
402 pub struct ReceiveFlags: u32 {
404 const NON_BLOCK = 1;
406 }
407}
408
409#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
410pub enum QueueError {
412 Unknown,
414 WouldBlock,
416}
417
418impl Display for QueueError {
419 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
420 match self {
421 Self::Unknown => write!(f, "unknown"),
422 Self::WouldBlock => write!(f, "would block"),
423 }
424 }
425}
426
427impl core::error::Error for QueueError {}
428
429#[cfg(feature = "std")]
430impl From<QueueError> for std::io::Error {
431 fn from(err: QueueError) -> Self {
432 match err {
433 QueueError::WouldBlock => std::io::Error::from(std::io::ErrorKind::WouldBlock),
434 _ => std::io::Error::from(std::io::ErrorKind::Other),
435 }
436 }
437}
438
439impl<T: Copy> RawQueue<T> {
440 pub unsafe fn new(hdr: *const RawQueueHdr, buf: *mut QueueEntry<T>) -> Self {
445 Self {
446 hdr,
447 buf: UnsafeCell::new(buf),
448 }
449 }
450
451 #[inline]
452 fn hdr(&self) -> &RawQueueHdr {
453 unsafe { &*self.hdr }
454 }
455
456 #[inline]
457 fn get_buf(&self, off: usize) -> *mut QueueEntry<T> {
458 unsafe { (*self.buf.get()).add(off & (self.hdr().len() - 1)) }
459 }
460
461 pub fn submit<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
467 &self,
468 item: QueueEntry<T>,
469 wait: W,
470 ring: R,
471 flags: SubmissionFlags,
472 ) -> Result<(), QueueError> {
473 let h = self.hdr().reserve_slot(flags, wait)?;
474 let buf_item = self.get_buf(h as usize);
475
476 unsafe { *addr_of_mut!((*buf_item).data) = item.data };
478 unsafe { *addr_of_mut!((*buf_item).info) = item.info };
479 let turn = self.hdr().get_turn(h);
480 unsafe {
481 addr_of_mut!(*buf_item)
482 .as_ref()
483 .unwrap()
484 .set_cmd_slot(h | if turn { 1u32 << 31 } else { 0 })
485 };
486
487 self.hdr().ring(ring);
488 Ok(())
489 }
490
491 pub fn receive<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
494 &self,
495 wait: W,
496 ring: R,
497 flags: ReceiveFlags,
498 ) -> Result<QueueEntry<T>, QueueError> {
499 let t = self
500 .hdr()
501 .get_next_ready(wait, flags, unsafe { *self.buf.get() })?;
502 let buf_item = self.get_buf(t as usize);
503 let item = unsafe { buf_item.read() };
504 self.hdr().advance_tail(ring);
505 Ok(item)
506 }
507
508 pub fn setup_sleep<'a>(
509 &'a self,
510 sleep: bool,
511 output: &mut Option<QueueEntry<T>>,
512 waiter: &mut (Option<&'a AtomicU64>, u64),
513 ringer: &mut Option<&'a AtomicU64>,
514 ) -> Result<(), QueueError> {
515 let t = self
516 .hdr()
517 .setup_rec_sleep(sleep, unsafe { *self.buf.get() }, waiter)?;
518 let buf_item = self.get_buf(t as usize);
519 let item = unsafe { buf_item.read() };
520 *output = Some(item);
521 self.hdr().advance_tail_setup(ringer);
522 Ok(())
523 }
524
525 #[inline]
526 pub fn setup_sleep_simple(&self) -> (&AtomicU64, u64) {
527 self.hdr().setup_rec_sleep_simple()
528 }
529
530 #[inline]
531 pub fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
532 self.hdr().setup_send_sleep_simple()
533 }
534}
535
536unsafe impl<T: Send> Send for RawQueue<T> {}
537unsafe impl<T: Send> Sync for RawQueue<T> {}
538
539#[cfg(any(feature = "std", test))]
540pub fn multi_receive<T: Copy, W: Fn(&[(Option<&AtomicU64>, u64)]), R: Fn(&[Option<&AtomicU64>])>(
564 queues: &[&RawQueue<T>],
565 output: &mut [Option<QueueEntry<T>>],
566 multi_wait: W,
567 multi_ring: R,
568 flags: ReceiveFlags,
569) -> Result<usize, QueueError> {
570 if output.len() != queues.len() {
571 return Err(QueueError::Unknown);
572 }
573 let mut waiters = Vec::new();
575 waiters.resize(queues.len(), Default::default());
576 let mut ringers = Vec::new();
577 ringers.resize(queues.len(), None);
578 let mut attempts = 100;
579 loop {
580 let mut count = 0;
581 for (i, q) in queues.iter().enumerate() {
582 let res = q.setup_sleep(
583 attempts == 0,
584 &mut output[i],
585 &mut waiters[i],
586 &mut ringers[i],
587 );
588 if res == Ok(()) {
589 count += 1;
590 }
591 }
592 if count > 0 {
593 multi_ring(&ringers);
594 return Ok(count);
595 }
596 if flags.contains(ReceiveFlags::NON_BLOCK) {
597 return Err(QueueError::WouldBlock);
598 }
599 if attempts > 0 {
600 attempts -= 1;
601 } else {
602 multi_wait(&waiters);
603 }
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 #![allow(soft_unstable)]
610 use std::sync::atomic::{AtomicU64, Ordering};
611
612 use crate::multi_receive;
614 use crate::{QueueEntry, QueueError, RawQueue, RawQueueHdr, ReceiveFlags, SubmissionFlags};
615
616 fn wait(x: &AtomicU64, v: u64) {
617 while x.load(Ordering::SeqCst) == v {
618 core::hint::spin_loop();
619 }
620 }
621
622 fn wake(_x: &AtomicU64) {
623 }
625
626 #[test]
627 fn it_transmits() {
628 let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
629 let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
630 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
631
632 for i in 0..100 {
633 let res = q.submit(
634 QueueEntry::new(i as u32, i * 10),
635 wait,
636 wake,
637 SubmissionFlags::empty(),
638 );
639 assert_eq!(res, Ok(()));
640 let res = q.receive(wait, wake, ReceiveFlags::empty());
641 assert!(res.is_ok());
642 assert_eq!(res.unwrap().info(), i as u32);
643 assert_eq!(res.unwrap().item(), i * 10);
644 }
645 }
646
647 #[test]
648 fn it_fills() {
649 let qh = RawQueueHdr::new(2, std::mem::size_of::<QueueEntry<u32>>());
650 let mut buffer = [QueueEntry::<i32>::default(); 1 << 2];
651 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
652
653 let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
654 assert_eq!(res, Ok(()));
655 let res = q.submit(QueueEntry::new(2, 7), wait, wake, SubmissionFlags::empty());
656 assert_eq!(res, Ok(()));
657 let res = q.submit(QueueEntry::new(3, 7), wait, wake, SubmissionFlags::empty());
658 assert_eq!(res, Ok(()));
659 let res = q.submit(QueueEntry::new(4, 7), wait, wake, SubmissionFlags::empty());
660 assert_eq!(res, Ok(()));
661 let res = q.submit(
662 QueueEntry::new(1, 7),
663 wait,
664 wake,
665 SubmissionFlags::NON_BLOCK,
666 );
667 assert_eq!(res, Err(QueueError::WouldBlock));
668 }
669
670 #[test]
671 fn it_nonblock_receives() {
672 let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
673 let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
674 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
675
676 let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
677 assert_eq!(res, Ok(()));
678 let res = q.receive(wait, wake, ReceiveFlags::empty());
679 assert!(res.is_ok());
680 assert_eq!(res.unwrap().info(), 1);
681 assert_eq!(res.unwrap().item(), 7);
682 let res = q.receive(wait, wake, ReceiveFlags::NON_BLOCK);
683 assert_eq!(res.unwrap_err(), QueueError::WouldBlock);
684 }
685
686 #[test]
687 fn it_multi_receives() {
688 let qh1 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
689 let mut buffer1 = [QueueEntry::<i32>::default(); 1 << 4];
690 let q1 = unsafe { RawQueue::new(&qh1, buffer1.as_mut_ptr()) };
691
692 let qh2 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
693 let mut buffer2 = [QueueEntry::<i32>::default(); 1 << 4];
694 let q2 = unsafe { RawQueue::new(&qh2, buffer2.as_mut_ptr()) };
695
696 let res = q1.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
697 assert_eq!(res, Ok(()));
698 let res = q2.submit(QueueEntry::new(2, 8), wait, wake, SubmissionFlags::empty());
699 assert_eq!(res, Ok(()));
700
701 let mut output = [None, None];
702 let res = multi_receive(
703 &[&q1, &q2],
704 &mut output,
705 |_| {},
706 |_| {},
707 ReceiveFlags::empty(),
708 );
709 assert_eq!(res, Ok(2));
710 assert_eq!(output[0].unwrap().info(), 1);
711 assert_eq!(output[0].unwrap().item(), 7);
712 assert_eq!(output[1].unwrap().info(), 2);
713 assert_eq!(output[1].unwrap().item(), 8);
714 }
715
716 }