twizzler_queue/
callback_queue.rs1use std::{future::Future, pin::Pin};
2
3use async_io::Async;
4use twizzler_queue_raw::{ReceiveFlags, SubmissionFlags};
5
6use crate::Queue;
7
8struct CallbackQueueReceiverInner<S, C> {
9 queue: Queue<S, C>,
10}
11
12pub struct CallbackQueueReceiver<S, C> {
14 inner: Async<Pin<Box<CallbackQueueReceiverInner<S, C>>>>,
15}
16
17impl<S: Copy + Send + Sync, C: Copy + Send + Sync> twizzler_futures::TwizzlerWaitable
18 for CallbackQueueReceiverInner<S, C>
19{
20 fn wait_item_read(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
21 (
22 self.queue.setup_read_sub_sleep(),
23 self.queue.has_pending_submission(),
24 )
25 }
26
27 fn wait_item_write(&self) -> (twizzler_abi::syscall::ThreadSyncSleep, bool) {
28 (
29 self.queue.setup_write_com_sleep(),
30 self.queue.has_com_space(),
31 )
32 }
33}
34
35impl<S: Copy + Send + Sync + 'static, C: Copy + Send + Sync + 'static> CallbackQueueReceiver<S, C> {
54 pub fn new(queue: Queue<S, C>) -> Self {
56 Self {
57 inner: Async::new_pin(CallbackQueueReceiverInner { queue }).unwrap(),
58 }
59 }
60
61 pub async fn handle<F, Fut>(&self, f: F) -> Result<(), std::io::Error>
63 where
64 F: FnOnce(u32, S) -> Fut,
65 Fut: Future<Output = C>,
66 {
67 let (id, item) = self
68 .inner
69 .read_with(|inner| {
70 inner
71 .queue
72 .receive(ReceiveFlags::NON_BLOCK)
73 .map_err(|e| e.into())
74 })
75 .await?;
76 let reply = f(id, item).await;
77 self.inner
78 .write_with(|inner| {
79 inner
80 .queue
81 .complete(id, reply, SubmissionFlags::NON_BLOCK)
82 .map_err(|e| e.into())
83 })
84 .await?;
85 Ok(())
86 }
87
88 pub async fn receive(&self) -> Result<(u32, S), std::io::Error> {
90 let r = self
91 .inner
92 .read_with(|inner| {
93 inner
94 .queue
95 .receive(ReceiveFlags::NON_BLOCK)
96 .map_err(|e| e.into())
97 })
98 .await;
99 r
100 }
101
102 pub async fn complete(&self, id: u32, reply: C) -> Result<(), std::io::Error> {
104 let r = self
105 .inner
106 .write_with(|inner| {
107 inner
108 .queue
109 .complete(id, reply, SubmissionFlags::NON_BLOCK)
110 .map_err(|e| e.into())
111 })
112 .await;
113 r
114 }
115}