twizzler_queue/
queue.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2
3use twizzler_abi::{
4    object::NULLPAGE_SIZE,
5    syscall::{
6        sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
7        ThreadSyncSleep, ThreadSyncWake,
8    },
9};
10pub use twizzler_queue_raw::{QueueBase, QueueError, ReceiveFlags, SubmissionFlags};
11use twizzler_queue_raw::{QueueEntry, RawQueue, RawQueueHdr};
12use twizzler_rt_abi::object::ObjectHandle;
13
14/// A single queue, holding two subqueues (sending and completion). Objects of type S are sent
15/// across the sending queue, and completions of type C are sent back.
16pub struct Queue<S, C> {
17    submission: RawQueue<S>,
18    completion: RawQueue<C>,
19    sub_rec_count: AtomicBool,
20    com_rec_count: AtomicBool,
21    object: ObjectHandle,
22}
23
24fn base<S, C>(obj: &ObjectHandle) -> &mut QueueBase<S, C> {
25    unsafe {
26        obj.start()
27            .add(NULLPAGE_SIZE)
28            .cast::<QueueBase<S, C>>()
29            .as_mut()
30            .unwrap()
31    }
32}
33
34fn get_raw_sub<S: Copy, C>(obj: &ObjectHandle) -> RawQueue<S> {
35    let base = base::<S, C>(obj);
36    unsafe {
37        let hdr = obj.start().add(base.sub_hdr).cast();
38        let buf = obj.start().add(base.sub_buf).cast();
39        RawQueue::new(hdr, buf)
40    }
41}
42
43fn get_raw_com<S, C: Copy>(obj: &ObjectHandle) -> RawQueue<C> {
44    let base = base::<S, C>(obj);
45    unsafe {
46        let hdr = obj.start().add(base.com_hdr).cast();
47        let buf = obj.start().add(base.com_buf).cast();
48        RawQueue::new(hdr, buf)
49    }
50}
51
52impl<S: Copy, C: Copy> From<ObjectHandle> for Queue<S, C> {
53    fn from(x: ObjectHandle) -> Self {
54        Self {
55            submission: get_raw_sub::<S, C>(&x),
56            completion: get_raw_com::<S, C>(&x),
57            sub_rec_count: AtomicBool::new(false),
58            com_rec_count: AtomicBool::new(false),
59            object: x,
60        }
61    }
62}
63
64fn wait(pt: &AtomicU64, val: u64) {
65    let op = ThreadSync::new_sleep(ThreadSyncSleep::new(
66        ThreadSyncReference::Virtual(pt as *const AtomicU64),
67        val,
68        ThreadSyncOp::Equal,
69        ThreadSyncFlags::empty(),
70    ));
71    let _ = sys_thread_sync(&mut [op], None);
72}
73
74fn ring(pt: &AtomicU64) {
75    let op = ThreadSync::new_wake(ThreadSyncWake::new(
76        ThreadSyncReference::Virtual(pt as *const AtomicU64),
77        usize::MAX,
78    ));
79    let _ = sys_thread_sync(&mut [op], None);
80}
81
82impl<S: Copy, C: Copy> Queue<S, C> {
83    /// Get a handle to the internal object that holds the queue data.
84    pub fn handle(&self) -> &ObjectHandle {
85        &self.object
86    }
87
88    pub fn com_hdr(&self) -> &RawQueueHdr {
89        self.completion.hdr()
90    }
91
92    pub fn sub_hdr(&self) -> &RawQueueHdr {
93        self.submission.hdr()
94    }
95
96    pub fn has_pending_submission(&self) -> bool {
97        self.submission.has_pending()
98    }
99
100    pub fn has_pending_completion(&self) -> bool {
101        self.completion.has_pending()
102    }
103
104    pub fn has_sub_space(&self) -> bool {
105        self.submission.has_space()
106    }
107
108    pub fn has_com_space(&self) -> bool {
109        self.completion.has_space()
110    }
111
112    /// Create a new Twizzler queue object.
113    pub fn init(obj: &ObjectHandle, sub_queue_len: usize, com_queue_len: usize) {
114        const HDR_LEN: usize = 0x1000;
115        // TODO: verify things
116        let sub_len = (core::mem::size_of::<S>() * sub_queue_len) * 2;
117        //let com_len = (core::mem::size_of::<C>() * com_queue_len) * 2;
118        let (sub_hdr, com_hdr) = {
119            let base: &mut QueueBase<S, C> = unsafe {
120                obj.start()
121                    .add(NULLPAGE_SIZE)
122                    .cast::<QueueBase<S, C>>()
123                    .as_mut()
124                    .unwrap()
125            };
126            base.sub_hdr = NULLPAGE_SIZE + HDR_LEN;
127            base.com_hdr = base.sub_hdr + HDR_LEN;
128            base.sub_buf = base.com_hdr + HDR_LEN;
129            base.com_buf = base.sub_buf + sub_len;
130            (base.sub_hdr, base.com_hdr)
131        };
132        unsafe {
133            let srq: *mut RawQueueHdr = obj.start().add(sub_hdr).cast();
134            let crq: *mut RawQueueHdr = obj.start().add(com_hdr).cast();
135            let l2len = sub_queue_len.next_power_of_two().ilog2();
136            srq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<S>()));
137            let l2len = com_queue_len.next_power_of_two().ilog2();
138            crq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<C>()));
139        }
140    }
141
142    fn with_guard<R>(&self, sub: bool, f: impl FnOnce() -> R) -> R {
143        let guard = if sub {
144            &self.sub_rec_count
145        } else {
146            &self.com_rec_count
147        };
148        if guard.swap(true, Ordering::SeqCst) {
149            panic!("cannot call queue receive operations from multiple concurrent threads");
150        }
151        let res = f();
152        guard.store(false, Ordering::SeqCst);
153        res
154    }
155
156    /// Submit an item of type S across the sending subqueue, with a given id.
157    pub fn submit(&self, id: u32, item: S, flags: SubmissionFlags) -> Result<(), QueueError> {
158        self.submission
159            .submit(QueueEntry::new(id, item), wait, ring, flags)
160    }
161
162    /// Receive an item and request id from the sending subqueue.
163    pub fn receive(&self, flags: ReceiveFlags) -> Result<(u32, S), QueueError> {
164        self.with_guard(true, || self.submission.receive(wait, ring, flags))
165            .map(|qe| (qe.info(), qe.item()))
166    }
167
168    /// Submit a completion item of type C across the completion subqueue.
169    pub fn complete(&self, id: u32, item: C, flags: SubmissionFlags) -> Result<(), QueueError> {
170        self.completion
171            .submit(QueueEntry::new(id, item), wait, ring, flags)
172    }
173
174    /// Receive a completion item and id from the completion subqueue.
175    pub fn get_completion(&self, flags: ReceiveFlags) -> Result<(u32, C), QueueError> {
176        self.with_guard(false, || self.completion.receive(wait, ring, flags))
177            .map(|qe| (qe.info(), qe.item()))
178    }
179
180    #[inline]
181    fn build_thread_sync(ptr: &AtomicU64, val: u64) -> ThreadSyncSleep {
182        ThreadSyncSleep::new(
183            ThreadSyncReference::Virtual(ptr as *const AtomicU64),
184            val,
185            ThreadSyncOp::Equal,
186            ThreadSyncFlags::empty(),
187        )
188    }
189
190    /// Setup a sleep operation for reading the completion subqueue.
191    #[inline]
192    pub fn setup_read_com_sleep(&self) -> ThreadSyncSleep {
193        let (ptr, val) = self.completion.setup_sleep_simple();
194        Self::build_thread_sync(ptr, val)
195    }
196
197    /// Setup a sleep operation for reading the sending subqueue.
198    #[inline]
199    pub fn setup_read_sub_sleep(&self) -> ThreadSyncSleep {
200        let (ptr, val) = self.submission.setup_sleep_simple();
201        Self::build_thread_sync(ptr, val)
202    }
203
204    /// Setup a sleep operation for writing the sending subqueue.
205    #[inline]
206    pub fn setup_write_sub_sleep(&self) -> ThreadSyncSleep {
207        let (ptr, val) = self.submission.setup_send_sleep_simple();
208        Self::build_thread_sync(ptr, val)
209    }
210
211    /// Setup a sleep operation for writing the completion subqueue.
212    #[inline]
213    pub fn setup_write_com_sleep(&self) -> ThreadSyncSleep {
214        let (ptr, val) = self.completion.setup_send_sleep_simple();
215        Self::build_thread_sync(ptr, val)
216    }
217}