twizzler_queue/
callback_queue.rs1use std::{future::Future, pin::Pin};
2
3use async_io::Async;
4use twizzler_queue_raw::{ReceiveFlags, SubmissionFlags};
5
6use crate::Queue;
7
8struct CallbackQueueReceiverInner<S, C> {
9 queue: Queue<S, C>,
10}
11
12pub struct CallbackQueueReceiver<S, C> {
14 inner: Async<Pin<Box<CallbackQueueReceiverInner<S, C>>>>,
15}
16
17impl<S: Copy + Send + Sync, C: Copy + Send + Sync> twizzler_futures::TwizzlerWaitable
18 for CallbackQueueReceiverInner<S, C>
19{
20 fn wait_item_read(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
21 self.queue.setup_read_sub_sleep()
22 }
23
24 fn wait_item_write(&self) -> twizzler_abi::syscall::ThreadSyncSleep {
25 self.queue.setup_write_com_sleep()
26 }
27}
28
29impl<S: Copy + Send + Sync, C: Copy + Send + Sync> CallbackQueueReceiver<S, C> {
48 pub fn new(queue: Queue<S, C>) -> Self {
50 Self {
51 inner: Async::new(CallbackQueueReceiverInner { queue }).unwrap(),
52 }
53 }
54
55 pub async fn handle<F, Fut>(&self, f: F) -> Result<(), std::io::Error>
57 where
58 F: FnOnce(u32, S) -> Fut,
59 Fut: Future<Output = C>,
60 {
61 let (id, item) = self
62 .inner
63 .read_with(|inner| {
64 inner
65 .queue
66 .receive(ReceiveFlags::NON_BLOCK)
67 .map_err(|e| e.into())
68 })
69 .await?;
70 let reply = f(id, item).await;
71 self.inner
72 .write_with(|inner| {
73 inner
74 .queue
75 .complete(id, reply, SubmissionFlags::NON_BLOCK)
76 .map_err(|e| e.into())
77 })
78 .await?;
79 Ok(())
80 }
81
82 pub async fn receive(&self) -> Result<(u32, S), std::io::Error> {
84 let r = self
85 .inner
86 .read_with(|inner| {
87 inner
88 .queue
89 .receive(ReceiveFlags::NON_BLOCK)
90 .map_err(|e| e.into())
91 })
92 .await;
93 r
94 }
95
96 pub async fn complete(&self, id: u32, reply: C) -> Result<(), std::io::Error> {
98 let r = self
99 .inner
100 .write_with(|inner| {
101 inner
102 .queue
103 .complete(id, reply, SubmissionFlags::NON_BLOCK)
104 .map_err(|e| e.into())
105 })
106 .await;
107 r
108 }
109}