twizzler_io/
pipe.rs

1use std::{
2    io::ErrorKind,
3    sync::atomic::{AtomicBool, AtomicU64, Ordering},
4};
5
6use twizzler::{
7    BaseType, Invariant,
8    object::{MapFlags, ObjID, Object, ObjectBuilder, TypedObject},
9};
10use twizzler_abi::syscall::{
11    ObjectCreate, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference, ThreadSyncSleep,
12    ThreadSyncWake, sys_thread_sync,
13};
14
15use crate::buffer::VolatileBuffer;
16
17pub const BUF_SZ: usize = 4096;
18
19#[derive(Invariant, BaseType)]
20pub struct PipeBase {
21    readers: AtomicU64,
22    writers: AtomicU64,
23    buffer: VolatileBuffer<BUF_SZ>,
24}
25
26impl PipeBase {
27    pub fn new() -> Self {
28        Self {
29            readers: AtomicU64::new(1),
30            writers: AtomicU64::new(1),
31            buffer: VolatileBuffer::new(),
32        }
33    }
34}
35
36pub struct Pipe {
37    pub pipe: Object<PipeBase>,
38    reader: AtomicBool,
39    writer: AtomicBool,
40}
41
42impl Pipe {
43    pub fn create_object(spec: ObjectCreate) -> std::io::Result<Self> {
44        let obj = ObjectBuilder::new(spec).build(PipeBase::new())?;
45        Ok(Self {
46            pipe: obj,
47            reader: AtomicBool::new(true),
48            writer: AtomicBool::new(true),
49        })
50    }
51
52    pub fn open_object(id: ObjID) -> std::io::Result<Self> {
53        let obj =
54            unsafe { Object::<PipeBase>::map_unchecked(id, MapFlags::READ | MapFlags::WRITE) }?;
55        let this = Self {
56            pipe: obj,
57            reader: AtomicBool::new(true),
58            writer: AtomicBool::new(true),
59        };
60        this.increment_reader();
61        this.increment_writer();
62        Ok(this)
63    }
64
65    pub fn id(&self) -> ObjID {
66        self.pipe.id()
67    }
68
69    pub fn readers(&self) -> u64 {
70        self.pipe.base().readers.load(Ordering::SeqCst)
71    }
72
73    pub fn writers(&self) -> u64 {
74        self.pipe.base().writers.load(Ordering::SeqCst)
75    }
76
77    pub fn read_waitpoint(&self) -> ThreadSyncSleep {
78        self.pipe.base().buffer.sync_for_pending_data()
79    }
80
81    pub fn write_waitpoint(&self) -> ThreadSyncSleep {
82        self.pipe.base().buffer.sync_for_avail_space()
83    }
84
85    pub fn is_reader(&self) -> bool {
86        self.reader.load(Ordering::SeqCst)
87    }
88
89    pub fn is_writer(&self) -> bool {
90        self.writer.load(Ordering::SeqCst)
91    }
92
93    pub fn enable_reader(&self) {
94        if !self.reader.swap(true, Ordering::SeqCst) {
95            self.increment_reader();
96        }
97    }
98
99    pub fn increment_reader(&self) {
100        self.pipe.base().readers.fetch_add(1, Ordering::SeqCst);
101        let _ = sys_thread_sync(
102            &mut [ThreadSync::new_wake(ThreadSyncWake::new(
103                ThreadSyncReference::Virtual(&self.pipe.base().readers),
104                usize::MAX,
105            ))],
106            None,
107        )
108        .inspect_err(|e| tracing::warn!("failed to wake on readers: {e}"));
109    }
110
111    pub fn enable_writer(&self) {
112        if !self.writer.swap(true, Ordering::SeqCst) {
113            self.increment_writer();
114        }
115    }
116
117    pub fn increment_writer(&self) {
118        self.pipe.base().writers.fetch_add(1, Ordering::SeqCst);
119        let _ = sys_thread_sync(
120            &mut [ThreadSync::new_wake(ThreadSyncWake::new(
121                ThreadSyncReference::Virtual(&self.pipe.base().writers),
122                usize::MAX,
123            ))],
124            None,
125        )
126        .inspect_err(|e| tracing::warn!("failed to wake on writers: {e}"));
127    }
128
129    pub fn close_reader(&self) {
130        if !self.reader.swap(false, Ordering::SeqCst) {
131            return;
132        }
133        if self.readers() == 0 {
134            return;
135        }
136
137        self.pipe.base().readers.fetch_sub(1, Ordering::SeqCst);
138
139        let _ = sys_thread_sync(
140            &mut [ThreadSync::new_wake(ThreadSyncWake::new(
141                ThreadSyncReference::Virtual(&self.pipe.base().readers),
142                usize::MAX,
143            ))],
144            None,
145        )
146        .inspect_err(|e| tracing::warn!("failed to wake on readers: {e}"));
147    }
148
149    pub fn close_writer(&self) {
150        if !self.writer.swap(false, Ordering::SeqCst) {
151            return;
152        }
153        if self.writers() == 0 {
154            return;
155        }
156        self.pipe.base().writers.fetch_sub(1, Ordering::SeqCst);
157
158        let _ = sys_thread_sync(
159            &mut [ThreadSync::new_wake(ThreadSyncWake::new(
160                ThreadSyncReference::Virtual(&self.pipe.base().writers),
161                usize::MAX,
162            ))],
163            None,
164        )
165        .inspect_err(|e| tracing::warn!("failed to wake on writers: {e}"));
166    }
167
168    fn do_sleep(&self, sync: ThreadSyncSleep) -> std::io::Result<()> {
169        let readers = self.readers();
170        let reader_sync = ThreadSync::new_sleep(ThreadSyncSleep::new(
171            ThreadSyncReference::Virtual(&self.pipe.base().readers),
172            readers,
173            ThreadSyncOp::Equal,
174            ThreadSyncFlags::empty(),
175        ));
176        let writers = self.writers();
177        let writer_sync = ThreadSync::new_sleep(ThreadSyncSleep::new(
178            ThreadSyncReference::Virtual(&self.pipe.base().writers),
179            writers,
180            ThreadSyncOp::Equal,
181            ThreadSyncFlags::empty(),
182        ));
183        sys_thread_sync(
184            &mut [ThreadSync::new_sleep(sync), reader_sync, writer_sync],
185            None,
186        )?;
187        Ok(())
188    }
189
190    pub fn has_pending_data(&self) -> bool {
191        !self.pipe.base().buffer.is_empty()
192    }
193
194    pub fn has_avail_space(&self) -> bool {
195        self.pipe.base().buffer.avail_space() > 0
196    }
197}
198
199impl Pipe {
200    pub fn read(&self, buf: &mut [u8], nb: bool) -> std::io::Result<usize> {
201        let writers = self.writers();
202        let sync = self.pipe.base().buffer.sync_for_pending_data();
203        let count = self.pipe.base().buffer.read_bytes(buf)?;
204        if count == 0 && buf.len() > 0 && writers > 0 {
205            if nb {
206                return Err(ErrorKind::WouldBlock.into());
207            }
208            self.do_sleep(sync)?;
209            return self.read(buf, nb);
210        }
211        Ok(count)
212    }
213}
214
215impl Pipe {
216    pub fn write(&self, buf: &[u8], nb: bool) -> std::io::Result<usize> {
217        let readers = self.readers();
218        let sync = self.pipe.base().buffer.sync_for_avail_space();
219        if readers == 0 {
220            return Err(ErrorKind::BrokenPipe.into());
221        }
222        let count = self.pipe.base().buffer.write_bytes(buf)?;
223        if count == 0 && buf.len() > 0 && readers > 0 {
224            if nb {
225                return Err(ErrorKind::WouldBlock.into());
226            }
227            self.do_sleep(sync)?;
228            return self.write(buf, nb);
229        }
230        Ok(count)
231    }
232
233    pub fn flush(&self) -> std::io::Result<()> {
234        Ok(())
235    }
236}
237
238impl Clone for Pipe {
239    fn clone(&self) -> Self {
240        let reader = self.reader.load(Ordering::SeqCst);
241        let writer = self.writer.load(Ordering::SeqCst);
242        if reader {
243            self.increment_reader();
244        }
245        if writer {
246            self.increment_writer();
247        }
248        Self {
249            pipe: self.pipe.clone(),
250            reader: AtomicBool::new(reader),
251            writer: AtomicBool::new(writer),
252        }
253    }
254}
255
256impl Drop for Pipe {
257    fn drop(&mut self) {
258        if self.reader.load(Ordering::SeqCst) {
259            self.close_reader();
260        }
261        if self.writer.load(Ordering::SeqCst) {
262            self.close_writer();
263        }
264    }
265}