twizzler_queue/
queue.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2
3use twizzler_abi::{
4    object::NULLPAGE_SIZE,
5    syscall::{
6        sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
7        ThreadSyncSleep, ThreadSyncWake,
8    },
9};
10pub use twizzler_queue_raw::{QueueBase, QueueError, ReceiveFlags, SubmissionFlags};
11use twizzler_queue_raw::{QueueEntry, RawQueue, RawQueueHdr};
12use twizzler_rt_abi::object::ObjectHandle;
13
14/// A single queue, holding two subqueues (sending and completion). Objects of type S are sent
15/// across the sending queue, and completions of type C are sent back.
16pub struct Queue<S, C> {
17    submission: RawQueue<S>,
18    completion: RawQueue<C>,
19    sub_rec_count: AtomicBool,
20    com_rec_count: AtomicBool,
21    object: ObjectHandle,
22}
23
24fn base<S, C>(obj: &ObjectHandle) -> &mut QueueBase<S, C> {
25    unsafe {
26        obj.start()
27            .add(NULLPAGE_SIZE)
28            .cast::<QueueBase<S, C>>()
29            .as_mut()
30            .unwrap()
31    }
32}
33
34fn get_raw_sub<S: Copy, C>(obj: &ObjectHandle) -> RawQueue<S> {
35    let base = base::<S, C>(obj);
36    unsafe {
37        let hdr = obj.start().add(base.sub_hdr).cast();
38        let buf = obj.start().add(base.sub_buf).cast();
39        RawQueue::new(hdr, buf)
40    }
41}
42
43fn get_raw_com<S, C: Copy>(obj: &ObjectHandle) -> RawQueue<C> {
44    let base = base::<S, C>(obj);
45    unsafe {
46        let hdr = obj.start().add(base.com_hdr).cast();
47        let buf = obj.start().add(base.com_buf).cast();
48        RawQueue::new(hdr, buf)
49    }
50}
51
52impl<S: Copy, C: Copy> From<ObjectHandle> for Queue<S, C> {
53    fn from(x: ObjectHandle) -> Self {
54        Self {
55            submission: get_raw_sub::<S, C>(&x),
56            completion: get_raw_com::<S, C>(&x),
57            sub_rec_count: AtomicBool::new(false),
58            com_rec_count: AtomicBool::new(false),
59            object: x,
60        }
61    }
62}
63
64fn wait(pt: &AtomicU64, val: u64) {
65    let op = ThreadSync::new_sleep(ThreadSyncSleep::new(
66        ThreadSyncReference::Virtual(pt as *const AtomicU64),
67        val,
68        ThreadSyncOp::Equal,
69        ThreadSyncFlags::empty(),
70    ));
71    let _ = sys_thread_sync(&mut [op], None);
72}
73
74fn ring(pt: &AtomicU64) {
75    let op = ThreadSync::new_wake(ThreadSyncWake::new(
76        ThreadSyncReference::Virtual(pt as *const AtomicU64),
77        usize::MAX,
78    ));
79    let _ = sys_thread_sync(&mut [op], None);
80}
81
82impl<S: Copy, C: Copy> Queue<S, C> {
83    /// Get a handle to the internal object that holds the queue data.
84    pub fn handle(&self) -> &ObjectHandle {
85        &self.object
86    }
87
88    /// Create a new Twizzler queue object.
89    pub fn init(obj: &ObjectHandle, sub_queue_len: usize, com_queue_len: usize) {
90        const HDR_LEN: usize = 0x1000;
91        // TODO: verify things
92        let sub_len = (core::mem::size_of::<S>() * sub_queue_len) * 2;
93        //let com_len = (core::mem::size_of::<C>() * com_queue_len) * 2;
94        let (sub_hdr, com_hdr) = {
95            let base: &mut QueueBase<S, C> = unsafe {
96                obj.start()
97                    .add(NULLPAGE_SIZE)
98                    .cast::<QueueBase<S, C>>()
99                    .as_mut()
100                    .unwrap()
101            };
102            base.sub_hdr = NULLPAGE_SIZE + HDR_LEN;
103            base.com_hdr = base.sub_hdr + HDR_LEN;
104            base.sub_buf = base.com_hdr + HDR_LEN;
105            base.com_buf = base.sub_buf + sub_len;
106            (base.sub_hdr, base.com_hdr)
107        };
108        unsafe {
109            let srq: *mut RawQueueHdr = obj.start().add(sub_hdr).cast();
110            let crq: *mut RawQueueHdr = obj.start().add(com_hdr).cast();
111            let l2len = sub_queue_len.next_power_of_two().ilog2();
112            srq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<S>()));
113            let l2len = com_queue_len.next_power_of_two().ilog2();
114            crq.write(RawQueueHdr::new(l2len as usize, core::mem::size_of::<C>()));
115        }
116    }
117
118    fn with_guard<R>(&self, sub: bool, f: impl FnOnce() -> R) -> R {
119        let guard = if sub {
120            &self.sub_rec_count
121        } else {
122            &self.com_rec_count
123        };
124        if guard.swap(true, Ordering::SeqCst) {
125            panic!("cannot call queue receive operations from multiple concurrent threads");
126        }
127        let res = f();
128        guard.store(false, Ordering::SeqCst);
129        res
130    }
131
132    /// Submit an item of type S across the sending subqueue, with a given id.
133    pub fn submit(&self, id: u32, item: S, flags: SubmissionFlags) -> Result<(), QueueError> {
134        self.submission
135            .submit(QueueEntry::new(id, item), wait, ring, flags)
136    }
137
138    /// Receive an item and request id from the sending subqueue.
139    pub fn receive(&self, flags: ReceiveFlags) -> Result<(u32, S), QueueError> {
140        self.with_guard(true, || self.submission.receive(wait, ring, flags))
141            .map(|qe| (qe.info(), qe.item()))
142    }
143
144    /// Submit a completion item of type C across the completion subqueue.
145    pub fn complete(&self, id: u32, item: C, flags: SubmissionFlags) -> Result<(), QueueError> {
146        self.completion
147            .submit(QueueEntry::new(id, item), wait, ring, flags)
148    }
149
150    /// Receive a completion item and id from the completion subqueue.
151    pub fn get_completion(&self, flags: ReceiveFlags) -> Result<(u32, C), QueueError> {
152        self.with_guard(false, || self.completion.receive(wait, ring, flags))
153            .map(|qe| (qe.info(), qe.item()))
154    }
155
156    #[inline]
157    fn build_thread_sync(ptr: &AtomicU64, val: u64) -> ThreadSyncSleep {
158        ThreadSyncSleep::new(
159            ThreadSyncReference::Virtual(ptr as *const AtomicU64),
160            val,
161            ThreadSyncOp::Equal,
162            ThreadSyncFlags::empty(),
163        )
164    }
165
166    /// Setup a sleep operation for reading the completion subqueue.
167    #[inline]
168    pub fn setup_read_com_sleep(&self) -> ThreadSyncSleep {
169        let (ptr, val) = self.completion.setup_sleep_simple();
170        Self::build_thread_sync(ptr, val)
171    }
172
173    /// Setup a sleep operation for reading the sending subqueue.
174    #[inline]
175    pub fn setup_read_sub_sleep(&self) -> ThreadSyncSleep {
176        let (ptr, val) = self.submission.setup_sleep_simple();
177        Self::build_thread_sync(ptr, val)
178    }
179
180    /// Setup a sleep operation for writing the sending subqueue.
181    #[inline]
182    pub fn setup_write_sub_sleep(&self) -> ThreadSyncSleep {
183        let (ptr, val) = self.submission.setup_send_sleep_simple();
184        Self::build_thread_sync(ptr, val)
185    }
186
187    /// Setup a sleep operation for writing the completion subqueue.
188    #[inline]
189    pub fn setup_write_com_sleep(&self) -> ThreadSyncSleep {
190        let (ptr, val) = self.completion.setup_send_sleep_simple();
191        Self::build_thread_sync(ptr, val)
192    }
193}