object_store/
paged_object_store.rs

1#![allow(async_fn_in_trait)]
2pub type ObjID = u128;
3
4use core::str;
5use std::{future::Future, io::ErrorKind, ops::Add, thread::yield_now};
6
7use async_io::block_on;
8#[cfg(target_os = "twizzler")]
9use twizzler::Result;
10#[cfg(target_os = "twizzler")]
11pub use twizzler_abi::pager::PhysRange;
12
13#[cfg(not(target_os = "twizzler"))]
14#[derive(Debug, PartialEq, Eq, Clone, Copy)]
15pub struct PhysRange {
16    pub start: u64,
17    pub end: u64,
18}
19#[cfg(not(target_os = "twizzler"))]
20use std::io::Result;
21
22use crate::PAGE_SIZE;
23
24const PAGED_MEM_WIRED: u32 = 1;
25const PAGED_MEM_COMPLETED: u32 = 2;
26#[derive(Debug, PartialEq, Eq, Clone, Copy)]
27pub struct PagedPhysMem {
28    pub range: PhysRange,
29    flags: u32,
30}
31
32impl core::ops::Add<u64> for PagedPhysMem {
33    type Output = Self;
34
35    fn add(self, rhs: u64) -> Self::Output {
36        if rhs == 0 {
37            Self {
38                range: self.range,
39                flags: self.flags,
40            }
41        } else {
42            Self {
43                range: PhysRange {
44                    start: self.range.end,
45                    end: self.range.end + PAGE_SIZE as u64,
46                },
47                flags: self.flags,
48            }
49        }
50    }
51}
52
53impl PagedPhysMem {
54    pub fn new(range: PhysRange) -> Self {
55        PagedPhysMem { range, flags: 0 }
56    }
57
58    pub fn is_completed(&self) -> bool {
59        self.flags & PAGED_MEM_COMPLETED != 0
60    }
61
62    pub fn is_wired(&self) -> bool {
63        self.flags & PAGED_MEM_WIRED != 0
64    }
65
66    pub fn set_completed(&mut self) {
67        self.flags |= PAGED_MEM_COMPLETED;
68    }
69
70    pub fn completed(mut self) -> Self {
71        self.set_completed();
72        self
73    }
74
75    pub fn wired(mut self) -> Self {
76        self.set_wired();
77        self
78    }
79
80    pub fn set_wired(&mut self) {
81        self.flags |= PAGED_MEM_WIRED;
82    }
83
84    pub fn len(&self) -> usize {
85        (self.range.end - self.range.start) as usize
86    }
87
88    pub fn nr_pages(&self) -> usize {
89        self.len() / PAGE_SIZE
90    }
91}
92
93#[derive(Clone, Copy, Debug)]
94pub enum DevicePage {
95    Run(u64, u32),
96    Hole(u32),
97}
98
99impl DevicePage {
100    pub fn from_array(array: &[u64]) -> Vec<Self, MAYHEAP_LEN> {
101        let mut tmp = Vec::<Self, MAYHEAP_LEN>::new();
102        for item in array {
103            let item = if *item == 0 {
104                DevicePage::Hole(1)
105            } else {
106                DevicePage::Run(*item, 1)
107            };
108            if let Some(prev) = tmp.last_mut() {
109                if !prev.try_extend(&item) {
110                    tmp.push(item).unwrap();
111                }
112            } else {
113                tmp.push(item).unwrap();
114            }
115        }
116        tmp
117    }
118
119    pub fn try_extend(&mut self, other: &DevicePage) -> bool {
120        let new_val = match (*self, other) {
121            (DevicePage::Hole(len1), &DevicePage::Hole(len2)) => {
122                Some(DevicePage::Hole(len1 + len2))
123            }
124            (DevicePage::Run(start1, len1), &DevicePage::Run(start2, len2)) => {
125                if start1 + len1 as u64 == start2 {
126                    Some(DevicePage::Run(start1, len1 + len2))
127                } else {
128                    None
129                }
130            }
131            _ => None,
132        };
133        if let Some(new_val) = new_val {
134            *self = new_val;
135            true
136        } else {
137            false
138        }
139    }
140
141    pub fn nr_pages(&self) -> usize {
142        match self {
143            DevicePage::Run(_, len) => *len as usize,
144            DevicePage::Hole(len) => *len as usize,
145        }
146    }
147
148    pub fn as_hole(&self) -> Option<u32> {
149        match self {
150            DevicePage::Hole(len) => Some(*len),
151            _ => None,
152        }
153    }
154
155    pub fn offset(&mut self, avail_len: &mut usize) {
156        let new = match self {
157            DevicePage::Run(start, len) => {
158                let new_len = len.saturating_sub(*avail_len as u32);
159                let diff = *len - new_len;
160                *avail_len = diff as usize;
161                DevicePage::Run(*start + diff as u64, new_len)
162            }
163            DevicePage::Hole(len) => {
164                let new_len = len.saturating_sub(*avail_len as u32);
165                let diff = *len - new_len;
166                *avail_len = diff as usize;
167                DevicePage::Hole(new_len)
168            }
169        };
170        *self = new;
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_try_extend_holes() {
180        let mut hole1 = DevicePage::Hole(10);
181        let hole2 = DevicePage::Hole(5);
182
183        assert!(hole1.try_extend(&hole2));
184        match hole1 {
185            DevicePage::Hole(len) => assert_eq!(len, 15),
186            _ => panic!("Expected Hole"),
187        }
188    }
189
190    #[test]
191    fn test_try_extend_consecutive_runs() {
192        let mut run1 = DevicePage::Run(100, 10);
193        let run2 = DevicePage::Run(110, 5);
194
195        assert!(run1.try_extend(&run2));
196        match run1 {
197            DevicePage::Run(start, len) => {
198                assert_eq!(start, 100);
199                assert_eq!(len, 15);
200            }
201            _ => panic!("Expected Run"),
202        }
203    }
204
205    #[test]
206    fn test_try_extend_non_consecutive_runs() {
207        let mut run1 = DevicePage::Run(100, 10);
208        let run2 = DevicePage::Run(120, 5);
209
210        assert!(!run1.try_extend(&run2));
211        match run1 {
212            DevicePage::Run(start, len) => {
213                assert_eq!(start, 100);
214                assert_eq!(len, 10); // Should remain unchanged
215            }
216            _ => panic!("Expected Run"),
217        }
218    }
219
220    #[test]
221    fn test_try_extend_mixed_types() {
222        let mut hole = DevicePage::Hole(10);
223        let run = DevicePage::Run(100, 5);
224
225        assert!(!hole.try_extend(&run));
226        match hole {
227            DevicePage::Hole(len) => assert_eq!(len, 10), // Should remain unchanged
228            _ => panic!("Expected Hole"),
229        }
230
231        let mut run = DevicePage::Run(100, 10);
232        let hole = DevicePage::Hole(5);
233
234        assert!(!run.try_extend(&hole));
235        match run {
236            DevicePage::Run(start, len) => {
237                assert_eq!(start, 100);
238                assert_eq!(len, 10); // Should remain unchanged
239            }
240            _ => panic!("Expected Run"),
241        }
242    }
243}
244
245pub trait PagedDevice {
246    /// Append the needed paged phys mem for this device page, return the number of appended pages.
247    async fn phys_addrs(
248        &self,
249        _start: DevicePage,
250        _phys_list: &mut Vec<PagedPhysMem, MAYHEAP_LEN>,
251    ) -> Result<usize> {
252        Err(std::io::ErrorKind::Unsupported.into())
253    }
254
255    async fn sequential_read(&self, start: u64, list: &[PhysRange]) -> Result<usize>;
256    async fn sequential_write(&self, start: u64, list: &[PhysRange]) -> Result<usize>;
257
258    async fn len(&self) -> Result<usize>;
259
260    fn yield_now(&self) {
261        yield_now();
262    }
263
264    fn run_async<R: 'static>(&self, f: impl Future<Output = R>) -> R {
265        block_on(f)
266    }
267}
268
269use mayheap::Vec;
270
271pub const MAYHEAP_LEN: usize = 16;
272#[derive(Debug)]
273pub struct PageRequest {
274    pub start_page: i64,
275    pub nr_pages: u32,
276    pub completed: u32,
277    pub phys_list: Vec<PagedPhysMem, MAYHEAP_LEN>,
278}
279
280impl PageRequest {
281    pub fn new(start_page: i64, nr_pages: u32) -> Self {
282        Self {
283            start_page,
284            phys_list: Vec::new(),
285            nr_pages,
286            completed: 0,
287        }
288    }
289
290    pub fn new_from_list(
291        phys_list: Vec<PagedPhysMem, MAYHEAP_LEN>,
292        start_page: i64,
293        nr_pages: u32,
294    ) -> Self {
295        Self {
296            start_page,
297            phys_list,
298            nr_pages,
299            completed: 0,
300        }
301    }
302
303    pub fn into_list(self) -> Vec<PagedPhysMem, MAYHEAP_LEN> {
304        self.phys_list
305    }
306
307    async fn setup_phys<PD: PagedDevice>(
308        &mut self,
309        disk_pages: &[DevicePage],
310        device: &PD,
311    ) -> Result<()> {
312        // TODO: recover these
313        self.phys_list.clear();
314        for page in disk_pages {
315            let mut count = 0;
316            while count < page.nr_pages() {
317                match device.phys_addrs(*page, &mut self.phys_list).await {
318                    Ok(r) => {
319                        if r == 0 {
320                            break;
321                        }
322                        count += r;
323                    }
324                    Err(e) if Into::<std::io::Error>::into(e).kind() == ErrorKind::OutOfMemory => {
325                        if self.phys_list.is_empty() {
326                            tracing::error!(": {}", e);
327                            return Err(e);
328                        } else {
329                            break;
330                        }
331                    }
332                    Err(e) => {
333                        tracing::error!(": {}", e);
334                        Err(e)?
335                    }
336                }
337            }
338
339            if count < page.nr_pages() {
340                break;
341            }
342        }
343
344        self.nr_pages =
345            self.phys_list
346                .iter()
347                .fold(0usize, |acc, range| acc + range.len() / PAGE_SIZE) as u32;
348        self.completed =
349            self.phys_list
350                .iter()
351                .filter(|p| p.is_completed())
352                .fold(0usize, |acc, range| acc + range.len() / PAGE_SIZE) as u32;
353        Ok(())
354    }
355
356    pub async fn page_in<PD: PagedDevice>(
357        &mut self,
358        disk_pages: &[DevicePage],
359        device: &PD,
360    ) -> Result<usize> {
361        self.setup_phys(disk_pages, device).await?;
362        if self.phys_list.iter().all(|p| p.is_completed()) {
363            return Ok(self.nr_pages as usize);
364        }
365
366        let mut cursor = 0;
367        let mut inner_cursor = 0;
368        let mut tfer_count = 0;
369        let mut tmp: Vec<PhysRange, MAYHEAP_LEN> = Vec::new();
370        for disk_page in disk_pages {
371            let mut count = 0;
372            tmp.clear();
373            while count < disk_page.nr_pages() {
374                let thislen = (disk_page.nr_pages() - count)
375                    .min(self.phys_list[cursor].nr_pages() - inner_cursor);
376
377                let new_range = PhysRange {
378                    start: self.phys_list[cursor].range.start + (inner_cursor * PAGE_SIZE) as u64,
379                    end: self.phys_list[cursor].range.start
380                        + (inner_cursor * PAGE_SIZE) as u64
381                        + (thislen * PAGE_SIZE) as u64,
382                };
383
384                tmp.push(new_range).unwrap();
385
386                inner_cursor += thislen;
387                if inner_cursor >= self.phys_list[cursor].nr_pages() {
388                    cursor += 1;
389                    inner_cursor = 0;
390                    if cursor >= self.phys_list.len() {
391                        break;
392                    }
393                }
394
395                count += thislen;
396            }
397
398            if let DevicePage::Run(start, _len) = disk_page {
399                let mut count = 0;
400                let mut idx = 0;
401                while idx < tmp.len() {
402                    let mut r = device
403                        .sequential_read(*start + count as u64, &tmp[idx..])
404                        .await
405                        .inspect_err(|e| tracing::error!("read err: {}", e))?;
406
407                    while r > 0 {
408                        let thiscount: usize = tmp[idx].page_count().min(r);
409                        if tmp[idx].page_count() == thiscount {
410                            idx += 1;
411                        } else {
412                            tmp[idx] = PhysRange {
413                                start: tmp[idx].start + (thiscount * PAGE_SIZE) as u64,
414                                end: tmp[idx].end,
415                            };
416                        }
417                        r -= thiscount;
418                        count += thiscount;
419                    }
420                }
421            }
422
423            tfer_count += count;
424
425            if cursor >= self.phys_list.len() {
426                break;
427            }
428        }
429
430        Ok(tfer_count)
431    }
432
433    pub async fn page_out<PD: PagedDevice>(
434        &mut self,
435        disk_pages: &[DevicePage],
436        device: &PD,
437    ) -> Result<usize> {
438        let mut cursor = 0;
439        let mut inner_cursor = 0;
440        let mut tfer_count = 0;
441        let mut tmp: Vec<PhysRange, MAYHEAP_LEN> = Vec::new();
442        for disk_page in disk_pages {
443            let mut count = 0;
444            tmp.clear();
445            while count < disk_page.nr_pages() {
446                let thislen = (disk_page.nr_pages() - count)
447                    .min(self.phys_list[cursor].nr_pages() - inner_cursor);
448
449                let new_range = PhysRange {
450                    start: self.phys_list[cursor].range.start + (inner_cursor * PAGE_SIZE) as u64,
451                    end: self.phys_list[cursor].range.start
452                        + (inner_cursor * PAGE_SIZE) as u64
453                        + (thislen * PAGE_SIZE) as u64,
454                };
455
456                tmp.push(new_range).unwrap();
457
458                inner_cursor += thislen;
459                if inner_cursor >= self.phys_list[cursor].nr_pages() {
460                    cursor += 1;
461                    inner_cursor = 0;
462                    if cursor >= self.phys_list.len() {
463                        break;
464                    }
465                }
466
467                count += thislen;
468            }
469
470            if let DevicePage::Run(start, _len) = disk_page {
471                let mut count = 0;
472                let mut idx = 0;
473                while idx < tmp.len() {
474                    let mut r = device
475                        .sequential_write(*start + count as u64, &tmp[idx..])
476                        .await
477                        .inspect_err(|e| tracing::error!("write err: {}", e))?;
478
479                    while r > 0 {
480                        let thiscount: usize = tmp[idx].page_count().min(r);
481                        if tmp[idx].page_count() == thiscount {
482                            idx += 1;
483                        } else {
484                            tmp[idx] = PhysRange {
485                                start: tmp[idx].start + (thiscount * PAGE_SIZE) as u64,
486                                end: tmp[idx].end,
487                            };
488                        }
489                        r -= thiscount;
490                        count += thiscount;
491                    }
492                }
493            }
494
495            tfer_count += count;
496
497            if cursor >= self.phys_list.len() {
498                break;
499            }
500        }
501
502        Ok(tfer_count)
503    }
504}
505
506pub trait PagedObjectStore {
507    async fn get_config_id(&self) -> Result<ObjID> {
508        let mut buf = [0; 16];
509        self.read_object(0, 0, &mut buf).await.and_then(|len| {
510            if len == 16 && buf.iter().find(|x| **x != 0).is_some() {
511                Ok(ObjID::from_le_bytes(buf))
512            } else {
513                Err(ErrorKind::InvalidData.into())
514            }
515        })
516    }
517
518    async fn set_config_id(&self, id: ObjID) -> Result<()> {
519        let _ = self.delete_object(0).await;
520        self.create_object(0).await?;
521        self.write_object(0, 0, &id.to_le_bytes()).await
522    }
523
524    async fn create_object(&self, id: ObjID) -> Result<()>;
525    async fn delete_object(&self, id: ObjID) -> Result<()>;
526
527    async fn len(&self, id: ObjID) -> Result<u64>;
528
529    async fn read_object(&self, id: ObjID, offset: u64, buf: &mut [u8]) -> Result<usize>;
530    async fn write_object(&self, id: ObjID, offset: u64, buf: &[u8]) -> Result<()>;
531
532    async fn page_in_object<'a>(&self, id: ObjID, reqs: &'a mut [PageRequest]) -> Result<usize>;
533    async fn page_out_object<'a>(&self, id: ObjID, reqs: &'a mut [PageRequest]) -> Result<usize>;
534
535    async fn flush(&self) -> Result<()> {
536        Ok(())
537    }
538
539    async fn enumerate_external(&self, _id: ObjID) -> Result<std::vec::Vec<ExternalFile>> {
540        Err(ErrorKind::Unsupported.into())
541    }
542
543    async fn find_external(&self, _id: ObjID) -> Result<usize> {
544        Err(ErrorKind::Unsupported.into())
545    }
546}
547
548pub const MAX_EXTERNAL_PATH: usize = 4096;
549pub const NAME_MAX: usize = 256;
550
551#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
552#[repr(C)]
553pub struct ExternalFile {
554    pub id: ObjID,
555    pub name: [u8; NAME_MAX],
556    pub name_len: u32,
557    pub kind: ExternalKind,
558}
559
560impl ExternalFile {
561    pub fn new(iname: &[u8], kind: ExternalKind, id: ObjID) -> Self {
562        let name_len = iname.len().min(NAME_MAX);
563        let sname = &iname[0..name_len];
564        let mut name = [0; NAME_MAX];
565        name[0..name_len].copy_from_slice(&sname);
566        Self {
567            id,
568            name,
569            kind,
570            name_len: name_len as u32,
571        }
572    }
573
574    pub fn name(&self) -> Option<&str> {
575        str::from_utf8(&self.name[0..(self.name_len as usize)]).ok()
576    }
577}
578
579#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
580#[repr(u32)]
581pub enum ExternalKind {
582    Regular,
583    Directory,
584    SymLink,
585    Other,
586}
587
588pub fn objid_to_ino(id: ObjID) -> Option<u32> {
589    if id == 1 {
590        return Some(0);
591    };
592    let (hi, lo) = ((id >> 64) as u64, id as u64);
593    if hi == (1u64 << 63) {
594        let ino = lo & !(1u64 << 63);
595        Some(ino as u32)
596    } else {
597        None
598    }
599}
600
601pub fn ino_to_objid(ino: u32) -> ObjID {
602    if ino == 0 {
603        return 1;
604    }
605    (1u128 << 127) | (ino as u128) | (1u128 << 63)
606}
607
608pub(crate) fn _consecutive_slices<T: PartialEq + Add<u64> + Copy>(
609    data: &[T],
610) -> impl Iterator<Item = &[T]>
611where
612    T::Output: PartialEq<T>,
613{
614    let mut slice_start = 0;
615    (1..=data.len()).flat_map(move |i| {
616        if i == data.len() || data[i - 1] + 1u64 != data[i] {
617            let begin = slice_start;
618            slice_start = i;
619            Some(&data[begin..i])
620        } else {
621            None
622        }
623    })
624}
625
626pub trait PosIo {
627    async fn read(&self, start: u64, buf: &mut [u8]) -> Result<usize>;
628    async fn write(&self, start: u64, buf: &[u8]) -> Result<usize>;
629}