twizzler_queue/
callback_queue.rs

1use std::{future::Future, pin::Pin};
2
3use async_io::Async;
4use twizzler_queue_raw::{ReceiveFlags, SubmissionFlags};
5
6use crate::Queue;
7
8struct CallbackQueueReceiverInner<S, C> {
9    queue: Queue<S, C>,
10}
11
12/// A receiver-side async-enabled queue abstraction.
13pub struct CallbackQueueReceiver<S, C> {
14    inner: Async<Pin<Box<CallbackQueueReceiverInner<S, C>>>>,
15}
16
17impl<S: Copy + Send + Sync, C: Copy + Send + Sync> twizzler_futures::TwizzlerWaitable
18    for CallbackQueueReceiverInner<S, C>
19{
20    fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
21        (
22            self.queue.setup_read_sub_sleep(),
23            self.queue.has_pending_submission(),
24        )
25    }
26
27    fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
28        (
29            self.queue.setup_write_com_sleep(),
30            self.queue.has_com_space(),
31        )
32    }
33}
34
35/*
36impl<S: Copy, C: Copy> AsyncDuplexSetup for CallbackQueueReceiverInner<S, C> {
37    type ReadError = QueueError;
38    type WriteError = QueueError;
39
40    const READ_WOULD_BLOCK: Self::ReadError = QueueError::WouldBlock;
41    const WRITE_WOULD_BLOCK: Self::WriteError = QueueError::WouldBlock;
42
43    fn setup_read_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
44        self.queue.setup_read_sub_sleep()
45    }
46
47    fn setup_write_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
48        self.queue.setup_write_com_sleep()
49    }
50}
51*/
52
53impl<S: Copy + Send + Sync + 'static, C: Copy + Send + Sync + 'static> CallbackQueueReceiver<S, C> {
54    /// Create a new CallbackQueueReceiver from a [Queue].
55    pub fn new(queue: Queue<S, C>) -> Self {
56        Self {
57            inner: Async::new_pin(CallbackQueueReceiverInner { queue }).unwrap(),
58        }
59    }
60
61    /// Handle a request in a closure that returns a completion.
62    pub async fn handle<F, Fut>(&self, f: F) -> Result<(), std::io::Error>
63    where
64        F: FnOnce(u32, S) -> Fut,
65        Fut: Future<Output = C>,
66    {
67        let (id, item) = self
68            .inner
69            .read_with(|inner| {
70                inner
71                    .queue
72                    .receive(ReceiveFlags::NON_BLOCK)
73                    .map_err(|e| e.into())
74            })
75            .await?;
76        let reply = f(id, item).await;
77        self.inner
78            .write_with(|inner| {
79                inner
80                    .queue
81                    .complete(id, reply, SubmissionFlags::NON_BLOCK)
82                    .map_err(|e| e.into())
83            })
84            .await?;
85        Ok(())
86    }
87
88    /// Receive a request without immediately returning a completion.
89    pub async fn receive(&self) -> Result<(u32, S), std::io::Error> {
90        let r = self
91            .inner
92            .read_with(|inner| {
93                inner
94                    .queue
95                    .receive(ReceiveFlags::NON_BLOCK)
96                    .map_err(|e| e.into())
97            })
98            .await;
99        r
100    }
101
102    /// Send a completion back to the sender.
103    pub async fn complete(&self, id: u32, reply: C) -> Result<(), std::io::Error> {
104        let r = self
105            .inner
106            .write_with(|inner| {
107                inner
108                    .queue
109                    .complete(id, reply, SubmissionFlags::NON_BLOCK)
110                    .map_err(|e| e.into())
111            })
112            .await;
113        r
114    }
115}