1use std::{
2 io::ErrorKind,
3 sync::atomic::{AtomicBool, AtomicU64, Ordering},
4};
5
6use twizzler::{
7 BaseType, Invariant,
8 object::{MapFlags, ObjID, Object, ObjectBuilder, TypedObject},
9};
10use twizzler_abi::syscall::{
11 ObjectCreate, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference, ThreadSyncSleep,
12 ThreadSyncWake, sys_thread_sync,
13};
14
15use crate::buffer::VolatileBuffer;
16
17pub const BUF_SZ: usize = 4096;
18
19#[derive(Invariant, BaseType)]
20pub struct PipeBase {
21 readers: AtomicU64,
22 writers: AtomicU64,
23 buffer: VolatileBuffer<BUF_SZ>,
24}
25
26impl PipeBase {
27 pub fn new() -> Self {
28 Self {
29 readers: AtomicU64::new(1),
30 writers: AtomicU64::new(1),
31 buffer: VolatileBuffer::new(),
32 }
33 }
34}
35
36pub struct Pipe {
37 pub pipe: Object<PipeBase>,
38 reader: AtomicBool,
39 writer: AtomicBool,
40}
41
42impl Pipe {
43 pub fn create_object(spec: ObjectCreate) -> std::io::Result<Self> {
44 let obj = ObjectBuilder::new(spec).build(PipeBase::new())?;
45 Ok(Self {
46 pipe: obj,
47 reader: AtomicBool::new(true),
48 writer: AtomicBool::new(true),
49 })
50 }
51
52 pub fn open_object(id: ObjID) -> std::io::Result<Self> {
53 let obj =
54 unsafe { Object::<PipeBase>::map_unchecked(id, MapFlags::READ | MapFlags::WRITE) }?;
55 let this = Self {
56 pipe: obj,
57 reader: AtomicBool::new(true),
58 writer: AtomicBool::new(true),
59 };
60 this.increment_reader();
61 this.increment_writer();
62 Ok(this)
63 }
64
65 pub fn id(&self) -> ObjID {
66 self.pipe.id()
67 }
68
69 pub fn readers(&self) -> u64 {
70 self.pipe.base().readers.load(Ordering::SeqCst)
71 }
72
73 pub fn writers(&self) -> u64 {
74 self.pipe.base().writers.load(Ordering::SeqCst)
75 }
76
77 pub fn read_waitpoint(&self) -> ThreadSyncSleep {
78 self.pipe.base().buffer.sync_for_pending_data()
79 }
80
81 pub fn write_waitpoint(&self) -> ThreadSyncSleep {
82 self.pipe.base().buffer.sync_for_avail_space()
83 }
84
85 pub fn is_reader(&self) -> bool {
86 self.reader.load(Ordering::SeqCst)
87 }
88
89 pub fn is_writer(&self) -> bool {
90 self.writer.load(Ordering::SeqCst)
91 }
92
93 pub fn enable_reader(&self) {
94 if !self.reader.swap(true, Ordering::SeqCst) {
95 self.increment_reader();
96 }
97 }
98
99 pub fn increment_reader(&self) {
100 self.pipe.base().readers.fetch_add(1, Ordering::SeqCst);
101 let _ = sys_thread_sync(
102 &mut [ThreadSync::new_wake(ThreadSyncWake::new(
103 ThreadSyncReference::Virtual(&self.pipe.base().readers),
104 usize::MAX,
105 ))],
106 None,
107 )
108 .inspect_err(|e| tracing::warn!("failed to wake on readers: {e}"));
109 }
110
111 pub fn enable_writer(&self) {
112 if !self.writer.swap(true, Ordering::SeqCst) {
113 self.increment_writer();
114 }
115 }
116
117 pub fn increment_writer(&self) {
118 self.pipe.base().writers.fetch_add(1, Ordering::SeqCst);
119 let _ = sys_thread_sync(
120 &mut [ThreadSync::new_wake(ThreadSyncWake::new(
121 ThreadSyncReference::Virtual(&self.pipe.base().writers),
122 usize::MAX,
123 ))],
124 None,
125 )
126 .inspect_err(|e| tracing::warn!("failed to wake on writers: {e}"));
127 }
128
129 pub fn close_reader(&self) {
130 if !self.reader.swap(false, Ordering::SeqCst) {
131 return;
132 }
133 if self.readers() == 0 {
134 return;
135 }
136
137 self.pipe.base().readers.fetch_sub(1, Ordering::SeqCst);
138
139 let _ = sys_thread_sync(
140 &mut [ThreadSync::new_wake(ThreadSyncWake::new(
141 ThreadSyncReference::Virtual(&self.pipe.base().readers),
142 usize::MAX,
143 ))],
144 None,
145 )
146 .inspect_err(|e| tracing::warn!("failed to wake on readers: {e}"));
147 }
148
149 pub fn close_writer(&self) {
150 if !self.writer.swap(false, Ordering::SeqCst) {
151 return;
152 }
153 if self.writers() == 0 {
154 return;
155 }
156 self.pipe.base().writers.fetch_sub(1, Ordering::SeqCst);
157
158 let _ = sys_thread_sync(
159 &mut [ThreadSync::new_wake(ThreadSyncWake::new(
160 ThreadSyncReference::Virtual(&self.pipe.base().writers),
161 usize::MAX,
162 ))],
163 None,
164 )
165 .inspect_err(|e| tracing::warn!("failed to wake on writers: {e}"));
166 }
167
168 fn do_sleep(&self, sync: ThreadSyncSleep) -> std::io::Result<()> {
169 let readers = self.readers();
170 let reader_sync = ThreadSync::new_sleep(ThreadSyncSleep::new(
171 ThreadSyncReference::Virtual(&self.pipe.base().readers),
172 readers,
173 ThreadSyncOp::Equal,
174 ThreadSyncFlags::empty(),
175 ));
176 let writers = self.writers();
177 let writer_sync = ThreadSync::new_sleep(ThreadSyncSleep::new(
178 ThreadSyncReference::Virtual(&self.pipe.base().writers),
179 writers,
180 ThreadSyncOp::Equal,
181 ThreadSyncFlags::empty(),
182 ));
183 sys_thread_sync(
184 &mut [ThreadSync::new_sleep(sync), reader_sync, writer_sync],
185 None,
186 )?;
187 Ok(())
188 }
189
190 pub fn has_pending_data(&self) -> bool {
191 !self.pipe.base().buffer.is_empty()
192 }
193
194 pub fn has_avail_space(&self) -> bool {
195 self.pipe.base().buffer.avail_space() > 0
196 }
197}
198
199impl Pipe {
200 pub fn read(&self, buf: &mut [u8], nb: bool) -> std::io::Result<usize> {
201 let writers = self.writers();
202 let sync = self.pipe.base().buffer.sync_for_pending_data();
203 let count = self.pipe.base().buffer.read_bytes(buf)?;
204 if count == 0 && buf.len() > 0 && writers > 0 {
205 if nb {
206 return Err(ErrorKind::WouldBlock.into());
207 }
208 self.do_sleep(sync)?;
209 return self.read(buf, nb);
210 }
211 Ok(count)
212 }
213}
214
215impl Pipe {
216 pub fn write(&self, buf: &[u8], nb: bool) -> std::io::Result<usize> {
217 let readers = self.readers();
218 let sync = self.pipe.base().buffer.sync_for_avail_space();
219 if readers == 0 {
220 return Err(ErrorKind::BrokenPipe.into());
221 }
222 let count = self.pipe.base().buffer.write_bytes(buf)?;
223 if count == 0 && buf.len() > 0 && readers > 0 {
224 if nb {
225 return Err(ErrorKind::WouldBlock.into());
226 }
227 self.do_sleep(sync)?;
228 return self.write(buf, nb);
229 }
230 Ok(count)
231 }
232
233 pub fn flush(&self) -> std::io::Result<()> {
234 Ok(())
235 }
236}
237
238impl Clone for Pipe {
239 fn clone(&self) -> Self {
240 let reader = self.reader.load(Ordering::SeqCst);
241 let writer = self.writer.load(Ordering::SeqCst);
242 if reader {
243 self.increment_reader();
244 }
245 if writer {
246 self.increment_writer();
247 }
248 Self {
249 pipe: self.pipe.clone(),
250 reader: AtomicBool::new(reader),
251 writer: AtomicBool::new(writer),
252 }
253 }
254}
255
256impl Drop for Pipe {
257 fn drop(&mut self) {
258 if self.reader.load(Ordering::SeqCst) {
259 self.close_reader();
260 }
261 if self.writer.load(Ordering::SeqCst) {
262 self.close_writer();
263 }
264 }
265}