object_store/
paged_object_store.rs

1#![allow(async_fn_in_trait)]
2pub type ObjID = u128;
3
4use std::{future::Future, io::ErrorKind, ops::Add, path::Path, thread::yield_now};
5
6use async_io::block_on;
7use libc::mode_t;
8pub use pager_dynamic::{
9    ino_to_objid, objid_to_ino, ExternalFile, ExternalFileSbHdr, ExternalKind,
10};
11use twizzler::Result;
12pub use twizzler_abi::pager::PhysRange;
13
14use crate::PAGE_SIZE;
15
16const PAGED_MEM_WIRED: u32 = 1;
17const PAGED_MEM_COMPLETED: u32 = 2;
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub struct PagedPhysMem {
20    pub range: PhysRange,
21    flags: u32,
22}
23
24impl core::ops::Add<u64> for PagedPhysMem {
25    type Output = Self;
26
27    fn add(self, rhs: u64) -> Self::Output {
28        if rhs == 0 {
29            Self {
30                range: self.range,
31                flags: self.flags,
32            }
33        } else {
34            Self {
35                range: PhysRange {
36                    start: self.range.end,
37                    end: self.range.end + PAGE_SIZE as u64,
38                },
39                flags: self.flags,
40            }
41        }
42    }
43}
44
45impl PagedPhysMem {
46    pub fn new(range: PhysRange) -> Self {
47        PagedPhysMem { range, flags: 0 }
48    }
49
50    pub fn is_completed(&self) -> bool {
51        self.flags & PAGED_MEM_COMPLETED != 0
52    }
53
54    pub fn is_wired(&self) -> bool {
55        self.flags & PAGED_MEM_WIRED != 0
56    }
57
58    pub fn set_completed(&mut self) {
59        self.flags |= PAGED_MEM_COMPLETED;
60    }
61
62    pub fn completed(mut self) -> Self {
63        self.set_completed();
64        self
65    }
66
67    pub fn wired(mut self) -> Self {
68        self.set_wired();
69        self
70    }
71
72    pub fn set_wired(&mut self) {
73        self.flags |= PAGED_MEM_WIRED;
74    }
75
76    pub fn len(&self) -> usize {
77        (self.range.end - self.range.start) as usize
78    }
79
80    pub fn nr_pages(&self) -> usize {
81        self.len() / PAGE_SIZE
82    }
83}
84
85#[derive(Clone, Copy, Debug)]
86pub enum DevicePage {
87    Run(u64, u32),
88    Hole(u32),
89}
90
91impl DevicePage {
92    pub fn from_array(array: &[u64]) -> Vec<Self, MAYHEAP_LEN> {
93        let mut tmp = Vec::<Self, MAYHEAP_LEN>::new();
94        for item in array {
95            let item = if *item == 0 {
96                DevicePage::Hole(1)
97            } else {
98                DevicePage::Run(*item, 1)
99            };
100            if let Some(prev) = tmp.last_mut() {
101                if !prev.try_extend(&item) {
102                    tmp.push(item).unwrap();
103                }
104            } else {
105                tmp.push(item).unwrap();
106            }
107        }
108        tmp
109    }
110
111    pub fn try_extend(&mut self, other: &DevicePage) -> bool {
112        let new_val = match (*self, other) {
113            (DevicePage::Hole(len1), &DevicePage::Hole(len2)) => {
114                Some(DevicePage::Hole(len1 + len2))
115            }
116            (DevicePage::Run(start1, len1), &DevicePage::Run(start2, len2)) => {
117                if start1 + len1 as u64 == start2 {
118                    Some(DevicePage::Run(start1, len1 + len2))
119                } else {
120                    None
121                }
122            }
123            _ => None,
124        };
125        if let Some(new_val) = new_val {
126            *self = new_val;
127            true
128        } else {
129            false
130        }
131    }
132
133    pub fn nr_pages(&self) -> usize {
134        match self {
135            DevicePage::Run(_, len) => *len as usize,
136            DevicePage::Hole(len) => *len as usize,
137        }
138    }
139
140    pub fn as_hole(&self) -> Option<u32> {
141        match self {
142            DevicePage::Hole(len) => Some(*len),
143            _ => None,
144        }
145    }
146
147    pub fn offset(&mut self, avail_len: &mut usize) {
148        let new = match self {
149            DevicePage::Run(start, len) => {
150                let new_len = len.saturating_sub(*avail_len as u32);
151                let diff = *len - new_len;
152                *avail_len = diff as usize;
153                DevicePage::Run(*start + diff as u64, new_len)
154            }
155            DevicePage::Hole(len) => {
156                let new_len = len.saturating_sub(*avail_len as u32);
157                let diff = *len - new_len;
158                *avail_len = diff as usize;
159                DevicePage::Hole(new_len)
160            }
161        };
162        *self = new;
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_try_extend_holes() {
172        let mut hole1 = DevicePage::Hole(10);
173        let hole2 = DevicePage::Hole(5);
174
175        assert!(hole1.try_extend(&hole2));
176        match hole1 {
177            DevicePage::Hole(len) => assert_eq!(len, 15),
178            _ => panic!("Expected Hole"),
179        }
180    }
181
182    #[test]
183    fn test_try_extend_consecutive_runs() {
184        let mut run1 = DevicePage::Run(100, 10);
185        let run2 = DevicePage::Run(110, 5);
186
187        assert!(run1.try_extend(&run2));
188        match run1 {
189            DevicePage::Run(start, len) => {
190                assert_eq!(start, 100);
191                assert_eq!(len, 15);
192            }
193            _ => panic!("Expected Run"),
194        }
195    }
196
197    #[test]
198    fn test_try_extend_non_consecutive_runs() {
199        let mut run1 = DevicePage::Run(100, 10);
200        let run2 = DevicePage::Run(120, 5);
201
202        assert!(!run1.try_extend(&run2));
203        match run1 {
204            DevicePage::Run(start, len) => {
205                assert_eq!(start, 100);
206                assert_eq!(len, 10); // Should remain unchanged
207            }
208            _ => panic!("Expected Run"),
209        }
210    }
211
212    #[test]
213    fn test_try_extend_mixed_types() {
214        let mut hole = DevicePage::Hole(10);
215        let run = DevicePage::Run(100, 5);
216
217        assert!(!hole.try_extend(&run));
218        match hole {
219            DevicePage::Hole(len) => assert_eq!(len, 10), // Should remain unchanged
220            _ => panic!("Expected Hole"),
221        }
222
223        let mut run = DevicePage::Run(100, 10);
224        let hole = DevicePage::Hole(5);
225
226        assert!(!run.try_extend(&hole));
227        match run {
228            DevicePage::Run(start, len) => {
229                assert_eq!(start, 100);
230                assert_eq!(len, 10); // Should remain unchanged
231            }
232            _ => panic!("Expected Run"),
233        }
234    }
235}
236
237pub trait PagedDevice {
238    /// Append the needed paged phys mem for this device page, return the number of appended pages.
239    async fn phys_addrs(
240        &self,
241        _start: DevicePage,
242        _phys_list: &mut Vec<PagedPhysMem, MAYHEAP_LEN>,
243    ) -> Result<usize> {
244        Err(std::io::ErrorKind::Unsupported.into())
245    }
246
247    async fn sequential_read(&self, start: u64, list: &[PhysRange]) -> Result<usize>;
248    async fn sequential_write(&self, start: u64, list: &[PhysRange]) -> Result<usize>;
249
250    async fn len(&self) -> Result<usize>;
251
252    fn yield_now(&self) {
253        yield_now();
254    }
255
256    fn run_async<R: 'static>(&self, f: impl Future<Output = R>) -> R {
257        block_on(f)
258    }
259}
260
261use mayheap::Vec;
262
263pub const MAYHEAP_LEN: usize = 16;
264#[derive(Debug)]
265pub struct PageRequest {
266    pub start_page: i64,
267    pub nr_pages: u32,
268    pub completed: u32,
269    pub phys_list: Vec<PagedPhysMem, MAYHEAP_LEN>,
270}
271
272impl PageRequest {
273    pub fn new(start_page: i64, nr_pages: u32) -> Self {
274        Self {
275            start_page,
276            phys_list: Vec::new(),
277            nr_pages,
278            completed: 0,
279        }
280    }
281
282    pub fn new_from_list(
283        phys_list: Vec<PagedPhysMem, MAYHEAP_LEN>,
284        start_page: i64,
285        nr_pages: u32,
286    ) -> Self {
287        Self {
288            start_page,
289            phys_list,
290            nr_pages,
291            completed: 0,
292        }
293    }
294
295    pub fn into_list(self) -> Vec<PagedPhysMem, MAYHEAP_LEN> {
296        self.phys_list
297    }
298
299    async fn setup_phys<PD: PagedDevice>(
300        &mut self,
301        disk_pages: &[DevicePage],
302        device: &PD,
303    ) -> Result<()> {
304        // TODO: recover these
305        self.phys_list.clear();
306        for page in disk_pages {
307            let mut count = 0;
308            while count < page.nr_pages() {
309                match device.phys_addrs(*page, &mut self.phys_list).await {
310                    Ok(r) => {
311                        if r == 0 {
312                            break;
313                        }
314                        count += r;
315                    }
316                    Err(e) if Into::<std::io::Error>::into(e).kind() == ErrorKind::OutOfMemory => {
317                        if self.phys_list.is_empty() {
318                            tracing::error!(": {}", e);
319                            return Err(e);
320                        } else {
321                            break;
322                        }
323                    }
324                    Err(e) => {
325                        tracing::error!(": {}", e);
326                        Err(e)?
327                    }
328                }
329            }
330
331            if count < page.nr_pages() {
332                break;
333            }
334        }
335
336        self.nr_pages =
337            self.phys_list
338                .iter()
339                .fold(0usize, |acc, range| acc + range.len() / PAGE_SIZE) as u32;
340        self.completed =
341            self.phys_list
342                .iter()
343                .filter(|p| p.is_completed())
344                .fold(0usize, |acc, range| acc + range.len() / PAGE_SIZE) as u32;
345        Ok(())
346    }
347
348    pub async fn page_in<PD: PagedDevice>(
349        &mut self,
350        disk_pages: &[DevicePage],
351        device: &PD,
352    ) -> Result<usize> {
353        self.setup_phys(disk_pages, device).await?;
354        if self.phys_list.iter().all(|p| p.is_completed()) {
355            return Ok(self.nr_pages as usize);
356        }
357
358        let mut cursor = 0;
359        let mut inner_cursor = 0;
360        let mut tfer_count = 0;
361        let mut tmp: Vec<PhysRange, MAYHEAP_LEN> = Vec::new();
362        for disk_page in disk_pages {
363            let mut count = 0;
364            tmp.clear();
365            while count < disk_page.nr_pages() {
366                let thislen = (disk_page.nr_pages() - count)
367                    .min(self.phys_list[cursor].nr_pages() - inner_cursor);
368
369                let new_range = PhysRange {
370                    start: self.phys_list[cursor].range.start + (inner_cursor * PAGE_SIZE) as u64,
371                    end: self.phys_list[cursor].range.start
372                        + (inner_cursor * PAGE_SIZE) as u64
373                        + (thislen * PAGE_SIZE) as u64,
374                };
375
376                tmp.push(new_range).unwrap();
377
378                inner_cursor += thislen;
379                if inner_cursor >= self.phys_list[cursor].nr_pages() {
380                    cursor += 1;
381                    inner_cursor = 0;
382                    if cursor >= self.phys_list.len() {
383                        break;
384                    }
385                }
386
387                count += thislen;
388            }
389
390            if let DevicePage::Run(start, _len) = disk_page {
391                let mut count = 0;
392                let mut idx = 0;
393                while idx < tmp.len() {
394                    let mut r = device
395                        .sequential_read(*start + count as u64, &tmp[idx..])
396                        .await
397                        .inspect_err(|e| tracing::error!("read err: {}", e))?;
398
399                    while r > 0 {
400                        let thiscount: usize = tmp[idx].page_count().min(r);
401                        if tmp[idx].page_count() == thiscount {
402                            idx += 1;
403                        } else {
404                            tmp[idx] = PhysRange {
405                                start: tmp[idx].start + (thiscount * PAGE_SIZE) as u64,
406                                end: tmp[idx].end,
407                            };
408                        }
409                        r -= thiscount;
410                        count += thiscount;
411                    }
412                }
413            }
414
415            tfer_count += count;
416
417            if cursor >= self.phys_list.len() {
418                break;
419            }
420        }
421
422        Ok(tfer_count)
423    }
424
425    pub async fn page_out<PD: PagedDevice>(
426        &mut self,
427        disk_pages: &[DevicePage],
428        device: &PD,
429    ) -> Result<usize> {
430        let mut cursor = 0;
431        let mut inner_cursor = 0;
432        let mut tfer_count = 0;
433        let mut tmp: Vec<PhysRange, MAYHEAP_LEN> = Vec::new();
434        for disk_page in disk_pages {
435            let mut count = 0;
436            tmp.clear();
437            while count < disk_page.nr_pages() {
438                let thislen = (disk_page.nr_pages() - count)
439                    .min(self.phys_list[cursor].nr_pages() - inner_cursor);
440
441                let new_range = PhysRange {
442                    start: self.phys_list[cursor].range.start + (inner_cursor * PAGE_SIZE) as u64,
443                    end: self.phys_list[cursor].range.start
444                        + (inner_cursor * PAGE_SIZE) as u64
445                        + (thislen * PAGE_SIZE) as u64,
446                };
447
448                tmp.push(new_range).unwrap();
449
450                inner_cursor += thislen;
451                if inner_cursor >= self.phys_list[cursor].nr_pages() {
452                    cursor += 1;
453                    inner_cursor = 0;
454                    if cursor >= self.phys_list.len() {
455                        break;
456                    }
457                }
458
459                count += thislen;
460            }
461
462            if let DevicePage::Run(start, _len) = disk_page {
463                let mut count = 0;
464                let mut idx = 0;
465                while idx < tmp.len() {
466                    let mut r = device
467                        .sequential_write(*start + count as u64, &tmp[idx..])
468                        .await
469                        .inspect_err(|e| tracing::error!("write err: {}", e))?;
470
471                    while r > 0 {
472                        let thiscount: usize = tmp[idx].page_count().min(r);
473                        if tmp[idx].page_count() == thiscount {
474                            idx += 1;
475                        } else {
476                            tmp[idx] = PhysRange {
477                                start: tmp[idx].start + (thiscount * PAGE_SIZE) as u64,
478                                end: tmp[idx].end,
479                            };
480                        }
481                        r -= thiscount;
482                        count += thiscount;
483                    }
484                }
485            }
486
487            tfer_count += count;
488
489            if cursor >= self.phys_list.len() {
490                break;
491            }
492        }
493
494        Ok(tfer_count)
495    }
496}
497
498pub trait PagedObjectStore {
499    async fn get_config_id(&self) -> Result<ObjID> {
500        let mut buf = [0; 16];
501        self.read_object(0, 0, &mut buf).await.and_then(|len| {
502            if len == 16 && buf.iter().find(|x| **x != 0).is_some() {
503                Ok(ObjID::from_le_bytes(buf))
504            } else {
505                Err(ErrorKind::InvalidData.into())
506            }
507        })
508    }
509
510    async fn set_config_id(&self, id: ObjID) -> Result<()> {
511        let _ = self.delete_object(0).await;
512        self.create_object(0).await?;
513        self.write_object(0, 0, &id.to_le_bytes()).await
514    }
515
516    async fn create_object(&self, id: ObjID) -> Result<()>;
517    async fn delete_object(&self, id: ObjID) -> Result<()>;
518
519    async fn len(&self, id: ObjID) -> Result<u64>;
520
521    async fn read_object(&self, id: ObjID, offset: u64, buf: &mut [u8]) -> Result<usize>;
522    async fn write_object(&self, id: ObjID, offset: u64, buf: &[u8]) -> Result<()>;
523
524    async fn page_in_object<'a>(&self, id: ObjID, reqs: &'a mut [PageRequest]) -> Result<usize>;
525    async fn page_out_object<'a>(&self, id: ObjID, reqs: &'a mut [PageRequest]) -> Result<usize>;
526
527    async fn flush(&self) -> Result<()> {
528        Ok(())
529    }
530}
531
532bitflags::bitflags! {
533    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
534    pub struct ExternalOpenFlags: u32 {
535        const READ = 0b0001;
536        const WRITE = 0b0010;
537        const CREATE = 0b0100;
538        const TRUNCATE = 0b1000;
539    }
540}
541
542pub trait ExternalFileStore {
543    async fn open_external(
544        &self,
545        at: Option<ObjID>,
546        path: impl AsRef<Path>,
547        flags: ExternalOpenFlags,
548        mode: mode_t,
549        link_to: Option<ObjID>,
550    ) -> Result<ExternalFile>;
551
552    async fn unlink_external(&self, at: Option<ObjID>, path: impl AsRef<Path>) -> Result<()>;
553    async fn readlink_external(&self, id: ObjID) -> Result<String>;
554
555    async fn readdir_external(
556        &self,
557        dir: ObjID,
558        skip: usize,
559        count: usize,
560        entries: &mut std::vec::Vec<ExternalFile>,
561    ) -> Result<()>;
562
563    async fn link_external(
564        &self,
565        file: &ExternalFile,
566        at: Option<ObjID>,
567        path: impl AsRef<Path>,
568    ) -> Result<()>;
569
570    async fn stat_external(&self, path: impl AsRef<Path>) -> Result<libc::stat>;
571    async fn fstat_external(&self, file: Option<ObjID>) -> Result<libc::stat>;
572
573    async fn symlink_external(
574        &self,
575        at: Option<ObjID>,
576        target: impl AsRef<Path>,
577        linkpath: impl AsRef<Path>,
578    ) -> Result<()>;
579}
580
581pub(crate) fn _consecutive_slices<T: PartialEq + Add<u64> + Copy>(
582    data: &[T],
583) -> impl Iterator<Item = &[T]>
584where
585    T::Output: PartialEq<T>,
586{
587    let mut slice_start = 0;
588    (1..=data.len()).flat_map(move |i| {
589        if i == data.len() || data[i - 1] + 1u64 != data[i] {
590            let begin = slice_start;
591            slice_start = i;
592            Some(&data[begin..i])
593        } else {
594            None
595        }
596    })
597}
598
599pub trait PosIo {
600    async fn read(&self, start: u64, buf: &mut [u8]) -> Result<usize>;
601    async fn write(&self, start: u64, buf: &[u8]) -> Result<usize>;
602}