1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2
3use twizzler_abi::{
4 object::NULLPAGE_SIZE,
5 syscall::{
6 sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
7 ThreadSyncSleep, ThreadSyncWake,
8 },
9};
10pub use twizzler_queue_raw::{QueueBase, QueueError, ReceiveFlags, SubmissionFlags};
11use twizzler_queue_raw::{QueueEntry, RawQueue, RawQueueHdr};
12use twizzler_rt_abi::object::ObjectHandle;
13
14pub struct Queue<S, C> {
17 submission: RawQueue<S>,
18 completion: RawQueue<C>,
19 sub_rec_count: AtomicBool,
20 com_rec_count: AtomicBool,
21 object: ObjectHandle,
22}
23
24fn base<S, C>(obj: &ObjectHandle) -> &mut QueueBase<S, C> {
25 unsafe {
26 obj.start()
27 .add(NULLPAGE_SIZE)
28 .cast::<QueueBase<S, C>>()
29 .as_mut()
30 .unwrap()
31 }
32}
33
34fn get_raw_sub<S: Copy, C>(obj: &ObjectHandle) -> RawQueue<S> {
35 let base = base::<S, C>(obj);
36 unsafe {
37 let hdr = obj.start().add(base.sub_hdr).cast();
38 let buf = obj.start().add(base.sub_buf).cast();
39 RawQueue::new(hdr, buf)
40 }
41}
42
43fn get_raw_com<S, C: Copy>(obj: &ObjectHandle) -> RawQueue<C> {
44 let base = base::<S, C>(obj);
45 unsafe {
46 let hdr = obj.start().add(base.com_hdr).cast();
47 let buf = obj.start().add(base.com_buf).cast();
48 RawQueue::new(hdr, buf)
49 }
50}
51
52impl<S: Copy, C: Copy> From<ObjectHandle> for Queue<S, C> {
53 fn from(x: ObjectHandle) -> Self {
54 Self {
55 submission: get_raw_sub::<S, C>(&x),
56 completion: get_raw_com::<S, C>(&x),
57 sub_rec_count: AtomicBool::new(false),
58 com_rec_count: AtomicBool::new(false),
59 object: x,
60 }
61 }
62}
63
64fn wait(pt: &AtomicU64, val: u64) {
65 let op = ThreadSync::new_sleep(ThreadSyncSleep::new(
66 ThreadSyncReference::Virtual(pt as *const AtomicU64),
67 val,
68 ThreadSyncOp::Equal,
69 ThreadSyncFlags::empty(),
70 ));
71 let _ = sys_thread_sync(&mut [op], None);
72}
73
74fn ring(pt: &AtomicU64) {
75 let op = ThreadSync::new_wake(ThreadSyncWake::new(
76 ThreadSyncReference::Virtual(pt as *const AtomicU64),
77 usize::MAX,
78 ));
79 let _ = sys_thread_sync(&mut [op], None);
80}
81
82impl<S: Copy, C: Copy> Queue<S, C> {
83 pub fn handle(&self) -> &ObjectHandle {
85 &self.object
86 }
87
88 pub fn com_hdr(&self) -> &RawQueueHdr {
89 self.completion.hdr()
90 }
91
92 pub fn sub_hdr(&self) -> &RawQueueHdr {
93 self.submission.hdr()
94 }
95
96 pub fn has_pending_submission(&self) -> bool {
97 self.submission.has_pending()
98 }
99
100 pub fn has_pending_completion(&self) -> bool {
101 self.completion.has_pending()
102 }
103
104 pub fn has_sub_space(&self) -> bool {
105 self.submission.has_space()
106 }
107
108 pub fn has_com_space(&self) -> bool {
109 self.completion.has_space()
110 }
111
112 pub fn init(obj: &ObjectHandle, sub_queue_len: usize, com_queue_len: usize) {
114 const HDR_LEN: usize = 0x1000;
115 let sub_len = (core::mem::size_of::<S>() * sub_queue_len) * 2;
117 let (sub_hdr, com_hdr) = {
119 let base: &mut QueueBase<S, C> = unsafe {
120 obj.start()
121 .add(NULLPAGE_SIZE)
122 .cast::<QueueBase<S, C>>()
123 .as_mut()
124 .unwrap()
125 };
126 base.sub_hdr = NULLPAGE_SIZE + HDR_LEN;
127 base.com_hdr = base.sub_hdr + HDR_LEN;
128 base.sub_buf = base.com_hdr + HDR_LEN;
129 base.com_buf = base.sub_buf + sub_len;
130 (base.sub_hdr, base.com_hdr)
131 };
132 unsafe {
133 let srq: *mut RawQueueHdr = obj.start().add(sub_hdr).cast();
134 let crq: *mut RawQueueHdr = obj.start().add(com_hdr).cast();
135 let l2len = sub_queue_len.next_power_of_two().ilog2();
136 srq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<S>()));
137 let l2len = com_queue_len.next_power_of_two().ilog2();
138 crq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<C>()));
139 }
140 }
141
142 fn with_guard<R>(&self, sub: bool, f: impl FnOnce() -> R) -> R {
143 let guard = if sub {
144 &self.sub_rec_count
145 } else {
146 &self.com_rec_count
147 };
148 if guard.swap(true, Ordering::SeqCst) {
149 panic!("cannot call queue receive operations from multiple concurrent threads");
150 }
151 let res = f();
152 guard.store(false, Ordering::SeqCst);
153 res
154 }
155
156 pub fn submit(&self, id: u32, item: S, flags: SubmissionFlags) -> Result<(), QueueError> {
158 self.submission
159 .submit(QueueEntry::new(id, item), wait, ring, flags)
160 }
161
162 pub fn receive(&self, flags: ReceiveFlags) -> Result<(u32, S), QueueError> {
164 self.with_guard(true, || self.submission.receive(wait, ring, flags))
165 .map(|qe| (qe.info(), qe.item()))
166 }
167
168 pub fn complete(&self, id: u32, item: C, flags: SubmissionFlags) -> Result<(), QueueError> {
170 self.completion
171 .submit(QueueEntry::new(id, item), wait, ring, flags)
172 }
173
174 pub fn get_completion(&self, flags: ReceiveFlags) -> Result<(u32, C), QueueError> {
176 self.with_guard(false, || self.completion.receive(wait, ring, flags))
177 .map(|qe| (qe.info(), qe.item()))
178 }
179
180 #[inline]
181 fn build_thread_sync(ptr: &AtomicU64, val: u64) -> ThreadSyncSleep {
182 ThreadSyncSleep::new(
183 ThreadSyncReference::Virtual(ptr as *const AtomicU64),
184 val,
185 ThreadSyncOp::Equal,
186 ThreadSyncFlags::empty(),
187 )
188 }
189
190 #[inline]
192 pub fn setup_read_com_sleep(&self) -> ThreadSyncSleep {
193 let (ptr, val) = self.completion.setup_sleep_simple();
194 Self::build_thread_sync(ptr, val)
195 }
196
197 #[inline]
199 pub fn setup_read_sub_sleep(&self) -> ThreadSyncSleep {
200 let (ptr, val) = self.submission.setup_sleep_simple();
201 Self::build_thread_sync(ptr, val)
202 }
203
204 #[inline]
206 pub fn setup_write_sub_sleep(&self) -> ThreadSyncSleep {
207 let (ptr, val) = self.submission.setup_send_sleep_simple();
208 Self::build_thread_sync(ptr, val)
209 }
210
211 #[inline]
213 pub fn setup_write_com_sleep(&self) -> ThreadSyncSleep {
214 let (ptr, val) = self.completion.setup_send_sleep_simple();
215 Self::build_thread_sync(ptr, val)
216 }
217}