1#![cfg_attr(test, feature(test))]
76#![cfg_attr(not(any(feature = "std", test)), no_std)]
77
78use core::{
79 cell::UnsafeCell,
80 fmt::Display,
81 marker::PhantomData,
82 ptr::addr_of_mut,
83 sync::atomic::{AtomicU32, AtomicU64, Ordering},
84};
85
86#[derive(Clone, Copy, Default, Debug)]
87#[repr(C)]
88pub struct QueueEntry<T> {
93 cmd_slot: u32,
94 info: u32,
95 data: T,
96}
97
98impl<T> QueueEntry<T> {
99 #[inline]
100 fn get_cmd_slot(&self) -> u32 {
101 unsafe { core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).load(Ordering::SeqCst) }
102 }
103
104 #[inline]
105 fn set_cmd_slot(&self, v: u32) {
106 unsafe {
107 core::mem::transmute::<&u32, &AtomicU32>(&self.cmd_slot).store(v, Ordering::SeqCst);
108 }
109 }
110
111 #[inline]
112 pub fn item(self) -> T {
114 self.data
115 }
116
117 #[inline]
118 pub fn info(&self) -> u32 {
120 self.info
121 }
122
123 pub fn new(info: u32, item: T) -> Self {
126 Self {
127 cmd_slot: 0,
128 info,
129 data: item,
130 }
131 }
132}
133
134#[repr(C)]
137pub struct QueueBase<S, C> {
138 pub sub_hdr: usize,
139 pub com_hdr: usize,
140 pub sub_buf: usize,
141 pub com_buf: usize,
142 _pd: PhantomData<(S, C)>,
143}
144
145#[repr(C)]
146pub struct RawQueueHdr {
149 l2len: usize,
150 stride: usize,
151 head: AtomicU32,
152 waiters: AtomicU32,
153 bell: AtomicU64,
154 tail: AtomicU64,
155}
156
157impl RawQueueHdr {
158 pub fn new(l2len: usize, stride: usize) -> Self {
160 Self {
161 l2len,
162 stride,
163 head: AtomicU32::new(0),
164 waiters: AtomicU32::new(0),
165 bell: AtomicU64::new(0),
166 tail: AtomicU64::new(0),
167 }
168 }
169
170 pub fn len_bytes(&self) -> usize {
171 self.len() * self.stride
172 }
173
174 #[inline]
175 pub fn len(&self) -> usize {
176 1 << self.l2len
177 }
178
179 #[inline]
180 fn is_full(&self, h: u32, t: u64) -> bool {
181 (h & 0x7fffffff) as u64 - (t & 0x7fffffff) >= self.len() as u64
182 }
183
184 #[inline]
185 fn is_empty(&self, bell: u64, tail: u64) -> bool {
186 (bell & 0x7fffffff) == (tail & 0x7fffffff)
187 }
188
189 #[inline]
190 fn is_turn<T>(&self, t: u64, item: *const QueueEntry<T>) -> bool {
191 let turn = (t / (self.len() as u64)) % 2;
192 let val = unsafe { &*item }.get_cmd_slot() >> 31;
193 (val == 0) == (turn == 1)
194 }
195
196 #[inline]
197 fn consumer_waiting(&self) -> bool {
198 (self.tail.load(Ordering::SeqCst) & (1 << 31)) != 0
199 }
200
201 #[inline]
202 fn submitter_waiting(&self) -> bool {
203 self.waiters.load(Ordering::SeqCst) > 0
204 }
205
206 #[inline]
207 fn consumer_set_waiting(&self, waiting: bool) {
208 if waiting {
209 self.tail.fetch_or(1 << 31, Ordering::SeqCst);
210 } else {
211 self.tail.fetch_and(!(1 << 31), Ordering::SeqCst);
212 }
213 }
214
215 #[inline]
216 fn inc_submit_waiting(&self) {
217 self.waiters.fetch_add(1, Ordering::SeqCst);
218 }
219
220 #[inline]
221 fn dec_submit_waiting(&self) {
222 self.waiters.fetch_sub(1, Ordering::SeqCst);
223 }
224
225 #[inline]
226 fn reserve_slot<W: Fn(&AtomicU64, u64)>(
227 &self,
228 flags: SubmissionFlags,
229 wait: W,
230 ) -> Result<u32, QueueError> {
231 let mut waiter = false;
232 let mut attempts = 1000;
233 let h = loop {
234 let h = self.head.load(Ordering::SeqCst);
235 let t = self.tail.load(Ordering::SeqCst);
236 if !self.is_full(h, t) {
237 if self
238 .head
239 .compare_exchange(h, h + 1, Ordering::SeqCst, Ordering::SeqCst)
240 .is_ok()
241 {
242 break h;
243 } else {
244 core::hint::spin_loop();
245 continue;
246 }
247 }
248
249 if flags.contains(SubmissionFlags::NON_BLOCK) {
250 return Err(QueueError::WouldBlock);
251 }
252
253 if attempts != 0 {
254 attempts -= 1;
255 core::hint::spin_loop();
256 continue;
257 }
258
259 if !waiter {
260 waiter = true;
261 self.inc_submit_waiting();
262 }
263
264 let t = self.tail.load(Ordering::SeqCst);
265 if self.is_full(h, t) {
266 wait(&self.tail, t);
267 }
268 };
269
270 if waiter {
271 self.dec_submit_waiting();
272 }
273
274 Ok(h & 0x7fffffff)
275 }
276
277 #[inline]
278 fn get_turn(&self, h: u32) -> bool {
279 (h / self.len() as u32) % 2 == 0
280 }
281
282 #[inline]
283 fn ring<R: Fn(&AtomicU64)>(&self, ring: R) {
284 self.bell.fetch_add(1, Ordering::SeqCst);
285 if self.consumer_waiting() {
286 ring(&self.bell)
287 }
288 }
289
290 pub fn has_pending<T>(&self, raw_buf: *const QueueEntry<T>) -> bool {
291 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
292 let b = self.bell.load(Ordering::SeqCst);
293 let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
294 !self.is_empty(b, t) && self.is_turn(t, item)
295 }
296
297 pub fn has_space<T>(&self) -> bool {
298 let h = self.head.load(Ordering::SeqCst);
299 let t = self.tail.load(Ordering::SeqCst);
300 !self.is_full(h, t)
301 }
302
303 #[inline]
304 fn get_next_ready<W: Fn(&AtomicU64, u64), T>(
305 &self,
306 wait: W,
307 flags: ReceiveFlags,
308 raw_buf: *const QueueEntry<T>,
309 ) -> Result<u64, QueueError> {
310 let mut attempts = 1000;
311 let t = loop {
312 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
313 let b = self.bell.load(Ordering::SeqCst);
314 let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
315
316 if !self.is_empty(b, t) && self.is_turn(t, item) {
317 break t;
318 }
319
320 if flags.contains(ReceiveFlags::NON_BLOCK) {
321 return Err(QueueError::WouldBlock);
322 }
323
324 if attempts != 0 {
325 attempts -= 1;
326 core::hint::spin_loop();
327 continue;
328 }
329
330 self.consumer_set_waiting(true);
331 let b = self.bell.load(Ordering::SeqCst);
332 if self.is_empty(b, t) || !self.is_turn(t, item) {
333 wait(&self.bell, b);
334 }
335 };
336
337 if attempts == 0 {
338 self.consumer_set_waiting(false);
339 }
340 Ok(t)
341 }
342
343 fn setup_rec_sleep_simple(&self) -> (&AtomicU64, u64) {
344 self.consumer_set_waiting(true);
346 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
347 (&self.bell, t)
348 }
349
350 fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
351 self.submitter_waiting();
353 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
354 let h = self.head.load(Ordering::SeqCst) & 0x7fffffff;
355 if self.is_full(h, t) {
356 (&self.tail, t)
357 } else {
358 (&self.tail, u64::MAX)
359 }
360 }
361
362 fn setup_rec_sleep<'a, T>(
363 &'a self,
364 sleep: bool,
365 raw_buf: *const QueueEntry<T>,
366 waiter: &mut (Option<&'a AtomicU64>, u64),
367 ) -> Result<u64, QueueError> {
368 let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff;
369 let b = self.bell.load(Ordering::SeqCst);
370 let item = unsafe { raw_buf.add((t as usize) & (self.len() - 1)) };
371 *waiter = (Some(&self.bell), b);
372 if self.is_empty(b, t) || !self.is_turn(t, item) {
373 if sleep {
374 self.consumer_set_waiting(true);
375 let b = self.bell.load(Ordering::SeqCst);
376 *waiter = (Some(&self.bell), b);
377 if !self.is_empty(b, t) && self.is_turn(t, item) {
378 return Ok(t);
379 }
380 }
381 Err(QueueError::WouldBlock)
382 } else {
383 Ok(t)
384 }
385 }
386
387 #[inline]
388 fn advance_tail<R: Fn(&AtomicU64)>(&self, ring: R) {
389 let t = self.tail.load(Ordering::SeqCst);
390 self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
391 if self.submitter_waiting() {
392 ring(&self.tail);
393 }
394 }
395
396 #[inline]
397 fn advance_tail_setup<'a>(&'a self, ringer: &mut Option<&'a AtomicU64>) {
398 let t = self.tail.load(Ordering::SeqCst);
399 self.tail.store((t + 1) & 0x7fffffff, Ordering::SeqCst);
400 if self.submitter_waiting() {
401 *ringer = Some(&self.tail);
402 }
403 }
404}
405
406pub struct RawQueue<T> {
408 hdr: *const RawQueueHdr,
409 buf: UnsafeCell<*mut QueueEntry<T>>,
410}
411
412bitflags::bitflags! {
413 pub struct SubmissionFlags: u32 {
415 const NON_BLOCK = 1;
417 }
418
419 pub struct ReceiveFlags: u32 {
421 const NON_BLOCK = 1;
423 }
424}
425
426#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
427pub enum QueueError {
429 Unknown,
431 WouldBlock,
433}
434
435impl Display for QueueError {
436 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
437 match self {
438 Self::Unknown => write!(f, "unknown"),
439 Self::WouldBlock => write!(f, "would block"),
440 }
441 }
442}
443
444impl core::error::Error for QueueError {}
445
446#[cfg(feature = "std")]
447impl From<QueueError> for std::io::Error {
448 fn from(err: QueueError) -> Self {
449 match err {
450 QueueError::WouldBlock => std::io::Error::from(std::io::ErrorKind::WouldBlock),
451 _ => std::io::Error::from(std::io::ErrorKind::Other),
452 }
453 }
454}
455
456impl<T: Copy> RawQueue<T> {
457 pub unsafe fn new(hdr: *const RawQueueHdr, buf: *mut QueueEntry<T>) -> Self {
462 Self {
463 hdr,
464 buf: UnsafeCell::new(buf),
465 }
466 }
467
468 #[inline]
469 pub fn hdr(&self) -> &RawQueueHdr {
470 unsafe { &*self.hdr }
471 }
472
473 #[inline]
474 fn get_buf(&self, off: usize) -> *mut QueueEntry<T> {
475 unsafe { (*self.buf.get()).add(off & (self.hdr().len() - 1)) }
476 }
477
478 pub fn submit<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
484 &self,
485 item: QueueEntry<T>,
486 wait: W,
487 ring: R,
488 flags: SubmissionFlags,
489 ) -> Result<(), QueueError> {
490 let h = self.hdr().reserve_slot(flags, wait)?;
491 let buf_item = self.get_buf(h as usize);
492
493 unsafe { *addr_of_mut!((*buf_item).data) = item.data };
495 unsafe { *addr_of_mut!((*buf_item).info) = item.info };
496 let turn = self.hdr().get_turn(h);
497 unsafe {
498 addr_of_mut!(*buf_item)
499 .as_ref()
500 .unwrap()
501 .set_cmd_slot(h | if turn { 1u32 << 31 } else { 0 })
502 };
503
504 self.hdr().ring(ring);
505 Ok(())
506 }
507
508 pub fn receive<W: Fn(&AtomicU64, u64), R: Fn(&AtomicU64)>(
511 &self,
512 wait: W,
513 ring: R,
514 flags: ReceiveFlags,
515 ) -> Result<QueueEntry<T>, QueueError> {
516 let t = self
517 .hdr()
518 .get_next_ready(wait, flags, unsafe { *self.buf.get() })?;
519 let buf_item = self.get_buf(t as usize);
520 let item = unsafe { buf_item.read() };
521 self.hdr().advance_tail(ring);
522 Ok(item)
523 }
524
525 pub fn has_pending(&self) -> bool {
526 self.hdr().has_pending(unsafe { *self.buf.get() })
527 }
528
529 pub fn has_space(&self) -> bool {
530 self.hdr().has_space::<T>()
531 }
532
533 pub fn setup_sleep<'a>(
534 &'a self,
535 sleep: bool,
536 output: &mut Option<QueueEntry<T>>,
537 waiter: &mut (Option<&'a AtomicU64>, u64),
538 ringer: &mut Option<&'a AtomicU64>,
539 ) -> Result<(), QueueError> {
540 let t = self
541 .hdr()
542 .setup_rec_sleep(sleep, unsafe { *self.buf.get() }, waiter)?;
543 let buf_item = self.get_buf(t as usize);
544 let item = unsafe { buf_item.read() };
545 *output = Some(item);
546 self.hdr().advance_tail_setup(ringer);
547 Ok(())
548 }
549
550 #[inline]
551 pub fn setup_sleep_simple(&self) -> (&AtomicU64, u64) {
552 self.hdr().setup_rec_sleep_simple()
553 }
554
555 #[inline]
556 pub fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) {
557 self.hdr().setup_send_sleep_simple()
558 }
559}
560
561unsafe impl<T: Send> Send for RawQueue<T> {}
562unsafe impl<T: Send> Sync for RawQueue<T> {}
563
564#[cfg(any(feature = "std", test))]
565pub fn multi_receive<T: Copy, W: Fn(&[(Option<&AtomicU64>, u64)]), R: Fn(&[Option<&AtomicU64>])>(
589 queues: &[&RawQueue<T>],
590 output: &mut [Option<QueueEntry<T>>],
591 multi_wait: W,
592 multi_ring: R,
593 flags: ReceiveFlags,
594) -> Result<usize, QueueError> {
595 if output.len() != queues.len() {
596 return Err(QueueError::Unknown);
597 }
598 let mut waiters = Vec::new();
600 waiters.resize(queues.len(), Default::default());
601 let mut ringers = Vec::new();
602 ringers.resize(queues.len(), None);
603 let mut attempts = 100;
604 loop {
605 let mut count = 0;
606 for (i, q) in queues.iter().enumerate() {
607 let res = q.setup_sleep(
608 attempts == 0,
609 &mut output[i],
610 &mut waiters[i],
611 &mut ringers[i],
612 );
613 if res == Ok(()) {
614 count += 1;
615 }
616 }
617 if count > 0 {
618 multi_ring(&ringers);
619 return Ok(count);
620 }
621 if flags.contains(ReceiveFlags::NON_BLOCK) {
622 return Err(QueueError::WouldBlock);
623 }
624 if attempts > 0 {
625 attempts -= 1;
626 } else {
627 multi_wait(&waiters);
628 }
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 #![allow(soft_unstable)]
635 use std::sync::atomic::{AtomicU64, Ordering};
636
637 use crate::multi_receive;
639 use crate::{QueueEntry, QueueError, RawQueue, RawQueueHdr, ReceiveFlags, SubmissionFlags};
640
641 fn wait(x: &AtomicU64, v: u64) {
642 while x.load(Ordering::SeqCst) == v {
643 core::hint::spin_loop();
644 }
645 }
646
647 fn wake(_x: &AtomicU64) {
648 }
650
651 #[test]
652 fn it_transmits() {
653 let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
654 let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
655 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
656
657 for i in 0..100 {
658 let res = q.submit(
659 QueueEntry::new(i as u32, i * 10),
660 wait,
661 wake,
662 SubmissionFlags::empty(),
663 );
664 assert_eq!(res, Ok(()));
665 let res = q.receive(wait, wake, ReceiveFlags::empty());
666 assert!(res.is_ok());
667 assert_eq!(res.unwrap().info(), i as u32);
668 assert_eq!(res.unwrap().item(), i * 10);
669 }
670 }
671
672 #[test]
673 fn it_fills() {
674 let qh = RawQueueHdr::new(2, std::mem::size_of::<QueueEntry<u32>>());
675 let mut buffer = [QueueEntry::<i32>::default(); 1 << 2];
676 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
677
678 let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
679 assert_eq!(res, Ok(()));
680 let res = q.submit(QueueEntry::new(2, 7), wait, wake, SubmissionFlags::empty());
681 assert_eq!(res, Ok(()));
682 let res = q.submit(QueueEntry::new(3, 7), wait, wake, SubmissionFlags::empty());
683 assert_eq!(res, Ok(()));
684 let res = q.submit(QueueEntry::new(4, 7), wait, wake, SubmissionFlags::empty());
685 assert_eq!(res, Ok(()));
686 let res = q.submit(
687 QueueEntry::new(1, 7),
688 wait,
689 wake,
690 SubmissionFlags::NON_BLOCK,
691 );
692 assert_eq!(res, Err(QueueError::WouldBlock));
693 }
694
695 #[test]
696 fn it_nonblock_receives() {
697 let qh = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
698 let mut buffer = [QueueEntry::<i32>::default(); 1 << 4];
699 let q = unsafe { RawQueue::new(&qh, buffer.as_mut_ptr()) };
700
701 let res = q.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
702 assert_eq!(res, Ok(()));
703 let res = q.receive(wait, wake, ReceiveFlags::empty());
704 assert!(res.is_ok());
705 assert_eq!(res.unwrap().info(), 1);
706 assert_eq!(res.unwrap().item(), 7);
707 let res = q.receive(wait, wake, ReceiveFlags::NON_BLOCK);
708 assert_eq!(res.unwrap_err(), QueueError::WouldBlock);
709 }
710
711 #[test]
712 fn it_multi_receives() {
713 let qh1 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
714 let mut buffer1 = [QueueEntry::<i32>::default(); 1 << 4];
715 let q1 = unsafe { RawQueue::new(&qh1, buffer1.as_mut_ptr()) };
716
717 let qh2 = RawQueueHdr::new(4, std::mem::size_of::<QueueEntry<u32>>());
718 let mut buffer2 = [QueueEntry::<i32>::default(); 1 << 4];
719 let q2 = unsafe { RawQueue::new(&qh2, buffer2.as_mut_ptr()) };
720
721 let res = q1.submit(QueueEntry::new(1, 7), wait, wake, SubmissionFlags::empty());
722 assert_eq!(res, Ok(()));
723 let res = q2.submit(QueueEntry::new(2, 8), wait, wake, SubmissionFlags::empty());
724 assert_eq!(res, Ok(()));
725
726 let mut output = [None, None];
727 let res = multi_receive(
728 &[&q1, &q2],
729 &mut output,
730 |_| {},
731 |_| {},
732 ReceiveFlags::empty(),
733 );
734 assert_eq!(res, Ok(2));
735 assert_eq!(output[0].unwrap().info(), 1);
736 assert_eq!(output[0].unwrap().item(), 7);
737 assert_eq!(output[1].unwrap().info(), 2);
738 assert_eq!(output[1].unwrap().item(), 8);
739 }
740
741 }