twizzler_queue/
callback_queue.rs

1use std::{future::Future, pin::Pin};
2
3use async_io::Async;
4use twizzler_queue_raw::{ReceiveFlags, SubmissionFlags};
5
6use crate::Queue;
7
8struct CallbackQueueReceiverInner<S, C> {
9    queue: Queue<S, C>,
10}
11
12/// A receiver-side async-enabled queue abstraction.
13pub struct CallbackQueueReceiver<S, C> {
14    inner: Async<Pin<Box<CallbackQueueReceiverInner<S, C>>>>,
15}
16
17impl<S: Copy + Send + Sync, C: Copy + Send + Sync> twizzler_futures::TwizzlerWaitable
18    for CallbackQueueReceiverInner<S, C>
19{
20    fn wait_item_read(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
21        self.queue.setup_read_sub_sleep()
22    }
23
24    fn wait_item_write(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
25        self.queue.setup_write_com_sleep()
26    }
27}
28
29/*
30impl<S: Copy, C: Copy> AsyncDuplexSetup for CallbackQueueReceiverInner<S, C> {
31    type ReadError = QueueError;
32    type WriteError = QueueError;
33
34    const READ_WOULD_BLOCK: Self::ReadError = QueueError::WouldBlock;
35    const WRITE_WOULD_BLOCK: Self::WriteError = QueueError::WouldBlock;
36
37    fn setup_read_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
38        self.queue.setup_read_sub_sleep()
39    }
40
41    fn setup_write_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
42        self.queue.setup_write_com_sleep()
43    }
44}
45*/
46
47impl<S: Copy + Send + Sync, C: Copy + Send + Sync> CallbackQueueReceiver<S, C> {
48    /// Create a new CallbackQueueReceiver from a [Queue].
49    pub fn new(queue: Queue<S, C>) -> Self {
50        Self {
51            inner: Async::new(CallbackQueueReceiverInner { queue }).unwrap(),
52        }
53    }
54
55    /// Handle a request in a closure that returns a completion.
56    pub async fn handle<F, Fut>(&self, f: F) -> Result<(), std::io::Error>
57    where
58        F: FnOnce(u32, S) -> Fut,
59        Fut: Future<Output = C>,
60    {
61        let (id, item) = self
62            .inner
63            .read_with(|inner| {
64                inner
65                    .queue
66                    .receive(ReceiveFlags::NON_BLOCK)
67                    .map_err(|e| e.into())
68            })
69            .await?;
70        let reply = f(id, item).await;
71        self.inner
72            .write_with(|inner| {
73                inner
74                    .queue
75                    .complete(id, reply, SubmissionFlags::NON_BLOCK)
76                    .map_err(|e| e.into())
77            })
78            .await?;
79        Ok(())
80    }
81
82    /// Receive a request without immediately returning a completion.
83    pub async fn receive(&self) -> Result<(u32, S), std::io::Error> {
84        let r = self
85            .inner
86            .read_with(|inner| {
87                inner
88                    .queue
89                    .receive(ReceiveFlags::NON_BLOCK)
90                    .map_err(|e| e.into())
91            })
92            .await;
93        r
94    }
95
96    /// Send a completion back to the sender.
97    pub async fn complete(&self, id: u32, reply: C) -> Result<(), std::io::Error> {
98        let r = self
99            .inner
100            .write_with(|inner| {
101                inner
102                    .queue
103                    .complete(id, reply, SubmissionFlags::NON_BLOCK)
104                    .map_err(|e| e.into())
105            })
106            .await;
107        r
108    }
109}