1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2
3use twizzler_abi::{
4 object::NULLPAGE_SIZE,
5 syscall::{
6 sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
7 ThreadSyncSleep, ThreadSyncWake,
8 },
9};
10pub use twizzler_queue_raw::{QueueBase, QueueError, ReceiveFlags, SubmissionFlags};
11use twizzler_queue_raw::{QueueEntry, RawQueue, RawQueueHdr};
12use twizzler_rt_abi::object::ObjectHandle;
13
14pub struct Queue<S, C> {
17 submission: RawQueue<S>,
18 completion: RawQueue<C>,
19 sub_rec_count: AtomicBool,
20 com_rec_count: AtomicBool,
21 object: ObjectHandle,
22}
23
24fn base<S, C>(obj: &ObjectHandle) -> &mut QueueBase<S, C> {
25 unsafe {
26 obj.start()
27 .add(NULLPAGE_SIZE)
28 .cast::<QueueBase<S, C>>()
29 .as_mut()
30 .unwrap()
31 }
32}
33
34fn get_raw_sub<S: Copy, C>(obj: &ObjectHandle) -> RawQueue<S> {
35 let base = base::<S, C>(obj);
36 unsafe {
37 let hdr = obj.start().add(base.sub_hdr).cast();
38 let buf = obj.start().add(base.sub_buf).cast();
39 RawQueue::new(hdr, buf)
40 }
41}
42
43fn get_raw_com<S, C: Copy>(obj: &ObjectHandle) -> RawQueue<C> {
44 let base = base::<S, C>(obj);
45 unsafe {
46 let hdr = obj.start().add(base.com_hdr).cast();
47 let buf = obj.start().add(base.com_buf).cast();
48 RawQueue::new(hdr, buf)
49 }
50}
51
52impl<S: Copy, C: Copy> From<ObjectHandle> for Queue<S, C> {
53 fn from(x: ObjectHandle) -> Self {
54 Self {
55 submission: get_raw_sub::<S, C>(&x),
56 completion: get_raw_com::<S, C>(&x),
57 sub_rec_count: AtomicBool::new(false),
58 com_rec_count: AtomicBool::new(false),
59 object: x,
60 }
61 }
62}
63
64fn wait(pt: &AtomicU64, val: u64) {
65 let op = ThreadSync::new_sleep(ThreadSyncSleep::new(
66 ThreadSyncReference::Virtual(pt as *const AtomicU64),
67 val,
68 ThreadSyncOp::Equal,
69 ThreadSyncFlags::empty(),
70 ));
71 let _ = sys_thread_sync(&mut [op], None);
72}
73
74fn ring(pt: &AtomicU64) {
75 let op = ThreadSync::new_wake(ThreadSyncWake::new(
76 ThreadSyncReference::Virtual(pt as *const AtomicU64),
77 usize::MAX,
78 ));
79 let _ = sys_thread_sync(&mut [op], None);
80}
81
82impl<S: Copy, C: Copy> Queue<S, C> {
83 pub fn handle(&self) -> &ObjectHandle {
85 &self.object
86 }
87
88 pub fn init(obj: &ObjectHandle, sub_queue_len: usize, com_queue_len: usize) {
90 const HDR_LEN: usize = 0x1000;
91 let sub_len = (core::mem::size_of::<S>() * sub_queue_len) * 2;
93 let (sub_hdr, com_hdr) = {
95 let base: &mut QueueBase<S, C> = unsafe {
96 obj.start()
97 .add(NULLPAGE_SIZE)
98 .cast::<QueueBase<S, C>>()
99 .as_mut()
100 .unwrap()
101 };
102 base.sub_hdr = NULLPAGE_SIZE + HDR_LEN;
103 base.com_hdr = base.sub_hdr + HDR_LEN;
104 base.sub_buf = base.com_hdr + HDR_LEN;
105 base.com_buf = base.sub_buf + sub_len;
106 (base.sub_hdr, base.com_hdr)
107 };
108 unsafe {
109 let srq: *mut RawQueueHdr = obj.start().add(sub_hdr).cast();
110 let crq: *mut RawQueueHdr = obj.start().add(com_hdr).cast();
111 let l2len = sub_queue_len.next_power_of_two().ilog2();
112 srq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<S>()));
113 let l2len = com_queue_len.next_power_of_two().ilog2();
114 crq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<C>()));
115 }
116 }
117
118 fn with_guard<R>(&self, sub: bool, f: impl FnOnce() -> R) -> R {
119 let guard = if sub {
120 &self.sub_rec_count
121 } else {
122 &self.com_rec_count
123 };
124 if guard.swap(true, Ordering::SeqCst) {
125 panic!("cannot call queue receive operations from multiple concurrent threads");
126 }
127 let res = f();
128 guard.store(false, Ordering::SeqCst);
129 res
130 }
131
132 pub fn submit(&self, id: u32, item: S, flags: SubmissionFlags) -> Result<(), QueueError> {
134 self.submission
135 .submit(QueueEntry::new(id, item), wait, ring, flags)
136 }
137
138 pub fn receive(&self, flags: ReceiveFlags) -> Result<(u32, S), QueueError> {
140 self.with_guard(true, || self.submission.receive(wait, ring, flags))
141 .map(|qe| (qe.info(), qe.item()))
142 }
143
144 pub fn complete(&self, id: u32, item: C, flags: SubmissionFlags) -> Result<(), QueueError> {
146 self.completion
147 .submit(QueueEntry::new(id, item), wait, ring, flags)
148 }
149
150 pub fn get_completion(&self, flags: ReceiveFlags) -> Result<(u32, C), QueueError> {
152 self.with_guard(false, || self.completion.receive(wait, ring, flags))
153 .map(|qe| (qe.info(), qe.item()))
154 }
155
156 #[inline]
157 fn build_thread_sync(ptr: &AtomicU64, val: u64) -> ThreadSyncSleep {
158 ThreadSyncSleep::new(
159 ThreadSyncReference::Virtual(ptr as *const AtomicU64),
160 val,
161 ThreadSyncOp::Equal,
162 ThreadSyncFlags::empty(),
163 )
164 }
165
166 #[inline]
168 pub fn setup_read_com_sleep(&self) -> ThreadSyncSleep {
169 let (ptr, val) = self.completion.setup_sleep_simple();
170 Self::build_thread_sync(ptr, val)
171 }
172
173 #[inline]
175 pub fn setup_read_sub_sleep(&self) -> ThreadSyncSleep {
176 let (ptr, val) = self.submission.setup_sleep_simple();
177 Self::build_thread_sync(ptr, val)
178 }
179
180 #[inline]
182 pub fn setup_write_sub_sleep(&self) -> ThreadSyncSleep {
183 let (ptr, val) = self.submission.setup_send_sleep_simple();
184 Self::build_thread_sync(ptr, val)
185 }
186
187 #[inline]
189 pub fn setup_write_com_sleep(&self) -> ThreadSyncSleep {
190 let (ptr, val) = self.completion.setup_send_sleep_simple();
191 Self::build_thread_sync(ptr, val)
192 }
193}