object_store/
ext4.rs

1#[cfg(not(target_os = "twizzler"))]
2use std::io::Result;
3use std::{
4    collections::HashMap,  ffi::CString, io::{ErrorKind, Read, Seek, SeekFrom, Write}, path::Path, sync::{
5        Mutex, MutexGuard, atomic::{AtomicU64, Ordering}
6    }, time::Instant
7};
8
9use libc::{PATH_MAX, mode_t};
10use lwext4_rs::{
11    Ext4Blockdev, Ext4BlockdevIface, Ext4File, Ext4Fs, MpLock, O_CREAT, O_RDONLY, O_RDWR,
12};
13use mayheap::Vec;
14use pager_dynamic::{ino_to_objid, objid_to_ino, ExternalFile};
15#[cfg(target_os = "twizzler")]
16use twizzler::Result;
17
18use crate::{
19    paged_object_store::MAYHEAP_LEN, DevicePage, ExternalFileStore, ExternalOpenFlags, ObjID,
20    PagedDevice, PagedObjectStore, PosIo, PAGE_SIZE,
21};
22
23pub struct Ext4Store<D: Device> {
24    fs: Mutex<Ext4Fs>,
25    ino_cache: Mutex<HashMap<ObjID, u32>>,
26    len_cache: Mutex<HashMap<ObjID, u64>>,
27    device: D,
28}
29
30pub trait Device: PosIo + PagedDevice + Sync + Send + Clone + 'static {}
31
32impl<T: PosIo + PagedDevice + Sync + Send + Clone + 'static> Device for T {}
33
34struct Ext4Bd<D: Device> {
35    device: D,
36    phys_bcount: u64,
37    lock: MpLock,
38}
39
40impl<D: Device> Ext4BlockdevIface for Ext4Bd<D> {
41    fn phys_block_size(&mut self) -> u32 {
42        PHYSICAL_BSIZE
43    }
44
45    fn phys_block_count(&mut self) -> u64 {
46        self.phys_bcount
47    }
48
49    fn open(&mut self) -> std::io::Result<()> {
50        Ok(())
51    }
52
53    fn close(&mut self) -> std::io::Result<()> {
54        Ok(())
55    }
56
57    fn read(&mut self, buf: *mut u8, block: u64, bcount: u32) -> std::io::Result<u32> {
58        let start = block * PHYSICAL_BSIZE as u64;
59        let len = bcount as u64 * PHYSICAL_BSIZE as u64;
60        let slice = unsafe { core::slice::from_raw_parts_mut(buf, len as usize) };
61        let len = self.device.run_async(self.device.read(start, slice))?;
62        Ok((len / PHYSICAL_BSIZE as usize) as u32)
63    }
64
65    fn write(&mut self, buf: *const u8, block: u64, bcount: u32) -> std::io::Result<u32> {
66        let start = block * PHYSICAL_BSIZE as u64;
67        let len = bcount as u64 * PHYSICAL_BSIZE as u64;
68        let slice = unsafe { core::slice::from_raw_parts(buf, len as usize) };
69        let len = self.device.run_async(self.device.write(start, slice))?;
70        Ok((len / PHYSICAL_BSIZE as usize) as u32)
71    }
72
73    fn lock(&self) -> std::io::Result<()> {
74        self.lock.lock();
75        Ok(())
76    }
77
78    fn unlock(&self) -> std::io::Result<()> {
79        self.lock.unlock();
80        Ok(())
81    }
82}
83
84impl<D: Device> Ext4Bd<D> {
85    fn new(device: D, _name: &str, phys_bcount: u64) -> Self {
86        Self {
87            device,
88            phys_bcount,
89            lock: MpLock::new(),
90        }
91    }
92}
93
94static BDEV_ID: AtomicU64 = AtomicU64::new(0);
95
96const LOGICAL_BSIZE: u32 = 512;
97const PHYSICAL_BSIZE: u32 = 512;
98
99impl<D: Device> Ext4Store<D> {
100    pub async fn new(device: D, name: &str) -> Result<Self> {
101        let bdname = format!("blockdev-{}", BDEV_ID.fetch_add(1, Ordering::SeqCst));
102        let max = device.len().await? as u64;
103        let bcount = max / LOGICAL_BSIZE as u64;
104        let phys_bcount = max / PHYSICAL_BSIZE as u64;
105        let bd = Ext4Blockdev::new(
106            Ext4Bd::new(device.clone(), bdname.as_str(), phys_bcount),
107            LOGICAL_BSIZE,
108            bcount,
109            name,
110        )?;
111
112        let mut fs = Ext4Fs::new(bd, CString::new(name).unwrap(), false)?;
113
114        match fs.create_dir("ids") {
115            Err(e) if e.kind() != ErrorKind::AlreadyExists => {
116                return Err(e.into());
117            }
118            _ => {}
119        }
120
121        Ok(Self {
122            fs: Mutex::new(fs),
123            device,
124            len_cache: Mutex::new(HashMap::default()),
125            ino_cache: Mutex::new(HashMap::default()),
126        })
127    }
128
129    fn get_len_from_cache(&self, id: ObjID) -> Option<u64> {
130        self.len_cache.lock().unwrap().get(&id).copied()
131    }
132
133    async fn readlink(&self, id: ObjID) -> Result<String> {
134        let mut buf = vec![0; PATH_MAX as usize];
135        let len = self.read_object(id, 0, &mut buf).await?;
136        buf.truncate(len);
137        String::from_utf8(buf).map_err(|_| ErrorKind::InvalidData.into())
138    }
139
140    fn invalidate_len(&self, id: ObjID) {
141        self.len_cache.lock().unwrap().remove(&id);
142    }
143
144    fn set_len_in_cache(&self, id: ObjID, len: u64) {
145        self.len_cache.lock().unwrap().insert(id, len);
146    }
147
148    pub fn get_id_path(&self, id: ObjID) -> (String, String) {
149        let top = id.to_be_bytes()[0];
150        let us = format!("ids/{:x}", top);
151        (us, format!("ids/{:x}/{:x}", top, id))
152    }
153
154    pub fn set_len(&self, id: ObjID, len: u64) -> Result<()> {
155        let mut fs = self.fs.lock().unwrap();
156        let mut file = self.get_object_as_file(&mut fs, id, false)?;
157        file.truncate(len)?;
158        self.set_len_in_cache(id, len);
159        Ok(())
160    }
161
162    pub fn lookup_ino_cache(&self, id: ObjID) -> Option<u32> {
163        self.ino_cache.lock().unwrap().get(&id).copied()
164    }
165
166    pub fn insert_ino_cache(&self, id: ObjID, ino: u32) {
167        self.ino_cache.lock().unwrap().insert(id, ino);
168    }
169
170    pub fn remove_ino_cache(&self, id: ObjID) {
171        self.ino_cache.lock().unwrap().remove(&id);
172    }
173
174    fn do_get_object_as_file<'a>(
175        &self,
176        fs: &'a mut MutexGuard<'_, Ext4Fs>,
177        id: ObjID,
178        create: bool,
179    ) -> Result<Ext4File<'a>> {
180        let flags = if create { O_RDWR | O_CREAT } else { O_RDWR };
181        if let Some(ino) = objid_to_ino(id) {
182            return Ok(fs.open_file_from_inode(ino, flags)?);
183        }
184        let path = self.get_id_path(id);
185        if create {
186            match fs.create_dir(&path.0) {
187                Ok(_) => {}
188                Err(e) if e.kind() == ErrorKind::AlreadyExists => {}
189                Err(e) => Err(e)?,
190            }
191        }
192        Ok(fs.open_file(&path.1, flags)?)
193    }
194
195    pub fn get_object_as_file<'a>(
196        &self,
197        fs: &'a mut MutexGuard<'_, Ext4Fs>,
198        id: ObjID,
199        create: bool,
200    ) -> Result<Ext4File<'a>> {
201        if let Some(ino) = self.lookup_ino_cache(id) {
202            return Ok(fs.open_file_from_inode(ino, O_RDWR)?);
203        }
204        let mut file = self.do_get_object_as_file(fs, id, create)?;
205        self.insert_ino_cache(id, file.get_file_inode()?.num());
206        Ok(file)
207    }
208}
209
210impl<D: Device> PagedObjectStore for Ext4Store<D> {
211    async fn create_object(&self, id: crate::ObjID) -> Result<()> {
212        let mut fs = self.fs.lock().unwrap();
213        self.get_object_as_file(&mut fs, id, true)?;
214        fs.flush()?;
215        Ok(())
216    }
217
218    async fn delete_object(&self, id: crate::ObjID) -> Result<()> {
219        let path = self.get_id_path(id);
220        let mut fs = self.fs.lock().unwrap();
221        fs.remove_file(&path.1)?;
222        self.remove_ino_cache(id);
223        self.invalidate_len(id);
224        fs.flush()?;
225        Ok(())
226    }
227
228    async fn len(&self, id: crate::ObjID) -> Result<u64> {
229        if let Some(len) = self.get_len_from_cache(id) {
230            return Ok(len);
231        }
232        let mut fs = self.fs.lock().unwrap();
233        let mut file = self.get_object_as_file(&mut fs, id, false)?;
234        self.set_len_in_cache(id, file.len());
235        Ok(file.len())
236    }
237
238    async fn read_object(&self, id: crate::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
239        let mut fs = self.fs.lock().unwrap();
240        let mut file = self.get_object_as_file(&mut fs, id, false)?;
241        file.seek(SeekFrom::Start(offset))?;
242        Ok(file.read(buf)?)
243    }
244
245    async fn write_object(&self, id: crate::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
246        let mut fs = self.fs.lock().unwrap();
247        let mut file = self.get_object_as_file(&mut fs, id, false)?;
248        if offset > file.len() {
249            file.ensure_backing(offset)
250                .inspect_err(|e| tracing::warn!("failed to ensure backing for object: {}", e))?;
251            file.truncate(offset).inspect_err(|e| {
252                tracing::warn!("failed to initialize object to {}: {}", offset, e)
253            })?;
254            self.invalidate_len(id);
255        }
256        file.seek(SeekFrom::Start(offset))?;
257        // TODO
258        file.write(buf)?;
259        drop(file);
260        fs.flush()?;
261        Ok(())
262    }
263
264    async fn page_in_object<'a>(
265        &self,
266        id: ObjID,
267        reqs: &'a mut [crate::PageRequest],
268    ) -> Result<usize> {
269        let mut fs = self.fs.lock().unwrap();
270        let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
271        let mut file = self
272            .get_object_as_file(&mut fs, id, false)
273            .inspect_err(|e| tracing::error!("go err: {}", e))?;
274        let mut inode = file
275            .get_file_inode()
276            .inspect_err(|e| tracing::error!("gfi err: {}", e))?;
277        let max_len = inode.size();
278        tracing::trace!("paging  in request for {} reqs", reqs.len());
279
280        let mut iters = 0;
281        drop(file);
282        drop(fs);
283        let mut blocks = reqs
284            .iter_mut()
285            .map(|req| {
286                let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
287
288                let mut page = req.start_page;
289                let end = req.start_page + req.nr_pages as i64;
290
291                let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
292                if rem_blocks > 64
293                    && page as usize * PAGE_SIZE >= (max_len as usize + PAGE_SIZE * 8)
294                {
295                    let _ = disk_pages.push(DevicePage::Hole(rem_blocks));
296                } else {
297                    let mut fs = self.fs.lock().unwrap();
298                    while page < end {
299                        iters += 1;
300                        if iters % 100 == 0 {
301                            drop(fs);
302                            self.device.yield_now();
303                            fs = self.fs.lock().unwrap();
304                        }
305
306                        let mut block = page as u32;
307                        if objid_to_ino(id).is_some() && block > 0 {
308                            // External files don't have null pages
309                            block -= 1;
310                        }
311                        block = block * blocks_per_page as u32;
312                        let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
313
314                        let item = match inode.get_data_blocks(block, rem_blocks, false) {
315                            Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
316                                if dblock == 0 {
317                                    DevicePage::Hole(nr_dblk)
318                                } else {
319                                    DevicePage::Run(dblock, nr_dblk)
320                                }
321                            }
322                            _ => match inode.get_data_block(block, false)? {
323                                0 => DevicePage::Hole(1),
324                                dpg => DevicePage::Run(dpg, 1),
325                            },
326                        };
327                        page += item.nr_pages() as i64;
328                        if let Some(prev) = disk_pages.last_mut() {
329                            if !prev.try_extend(&item) {
330                                disk_pages.push(item).unwrap();
331                            }
332                        } else {
333                            disk_pages.push(item).unwrap();
334                        }
335                    }
336                }
337                Result::Ok((req, disk_pages))
338            })
339            .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
340        for br in blocks.iter_mut() {
341            let pages = &br.1[..];
342            tracing::trace!("paging in {:?}", pages);
343            let _len = br.0.page_in(pages, &self.device).await?;
344        }
345
346        Ok(reqs.len())
347    }
348
349    async fn page_out_object<'a>(
350        &self,
351        id: ObjID,
352        reqs: &'a mut [crate::PageRequest],
353    ) -> Result<usize> {
354        let end_offset = reqs
355            .iter()
356            .max_by_key(|req| req.start_page as u64 + req.nr_pages as u64)
357            .map(|end_req| {
358                (end_req.start_page as u64 + end_req.nr_pages as u64) * PAGE_SIZE as u64
359            });
360
361        let start = Instant::now();
362        let mut fs = self.fs.lock().unwrap();
363        let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
364        let mut file = self.get_object_as_file(&mut fs, id, false)?;
365        if end_offset.unwrap_or(0) >= file.len() {
366            drop(file);
367            drop(fs);
368            self.write_object(id, end_offset.unwrap_or(0), &[0u8; PAGE_SIZE])
369                .await?;
370            fs = self.fs.lock().unwrap();
371        } else {
372            drop(file);
373        }
374        let mut file = self.get_object_as_file(&mut fs, id, false)?;
375        let mut inode = file.get_file_inode()?;
376        drop(file);
377        drop(fs);
378        tracing::trace!("paging out {:x} request for {} reqs", id, reqs.len());
379
380        let setup_done = Instant::now();
381        let mut iters = 0;
382        let mut blocks = reqs
383            .iter_mut()
384            .map(|req| {
385                let mut fs = self.fs.lock().unwrap();
386                let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
387
388                let mut page = req.start_page;
389                let end = req.start_page + req.nr_pages as i64;
390                while page < end {
391                    let mut block = page as u32;
392                    tracing::trace!("paging out block {}",  block);
393                    if objid_to_ino(id).is_some() && block > 0 {
394                        // External files don't have null pages
395                        block -= 1;
396                    }
397
398                    iters += 1;
399                    if iters % 100 == 0 {
400                        drop(fs);
401                        self.device.yield_now();
402                        fs = self.fs.lock().unwrap();
403                    }
404
405                    block = block * blocks_per_page as u32;
406                    let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
407
408                    let item = match inode.get_data_blocks(block, rem_blocks, true) {
409                        Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
410                            if dblock == 0 {
411                                tracing::warn!(
412                                    "got unexpected zero block when paging out object {:x}",
413                                    id
414                                );
415                                Result::Err(ErrorKind::Other.into())?
416                            } else {
417                                DevicePage::Run(dblock, nr_dblk)
418                            }
419                        }
420                        _ => match inode.get_data_block(block, true).inspect_err(|e| tracing::warn!("failed to get_data_block: {}", e))? {
421                            0 => {
422                                tracing::warn!(
423                                    "got unexpected zero block when paging out object {:x} in fallback",
424                                    id
425                                );
426                                Result::Err(ErrorKind::Other.into())?
427                            }
428                            dpg => DevicePage::Run(dpg, 1),
429                        },
430                    };
431                    page += item.nr_pages() as i64;
432                    if let Some(prev) = disk_pages.last_mut() {
433                        if !prev.try_extend(&item) {
434                            disk_pages.push(item).unwrap();
435                        }
436                    } else {
437                        disk_pages.push(item).unwrap();
438                    }
439                }
440                Result::Ok((req, disk_pages))
441            })
442            .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
443        tracing::trace!(
444            "found blocks for paging out in {}ms",
445            (Instant::now() - setup_done).as_millis()
446        );
447
448        let blocks_found = Instant::now();
449        for br in blocks.iter_mut() {
450            let pages = &br.1[..];
451            let _len = br.0.page_out(pages, &self.device).await?;
452        }
453        let mut fs = self.fs.lock().unwrap();
454        fs.flush()?;
455        let io_done = Instant::now();
456        tracing::trace!(
457            "==> {}ms {}ms {}ms",
458            (setup_done - start).as_millis(),
459            (blocks_found - setup_done).as_millis(),
460            (io_done - blocks_found).as_millis()
461        );
462        Ok(reqs.len())
463    }
464}
465
466impl<D: Device> ExternalFileStore for Ext4Store<D> {
467    async fn open_external(
468        &self,
469        at: Option<ObjID>,
470        path: impl AsRef<Path>,
471        flags: ExternalOpenFlags,
472        mode: mode_t,
473        link_to: Option<ObjID>,
474    ) -> Result<ExternalFile> {
475        let mut at_ino = if let Some(at) = at {
476            objid_to_ino(at).ok_or(ErrorKind::InvalidInput)?
477        } else {
478            2
479        };
480        if at_ino < 2 {
481            at_ino = 2;
482        }
483        tracing::trace!(
484            "opening external file at {:?} with flags {:?} and mode {:o} at ino {}, link_to = {:?}",
485            path.as_ref(),
486            flags,
487            mode, at_ino
488            ,link_to
489        );
490
491        let mut fs = self.fs.lock().unwrap();
492
493        let mut oflags = if flags.contains(ExternalOpenFlags::READ)
494            && flags.contains(ExternalOpenFlags::WRITE)
495        {
496            O_RDWR
497        } else if flags.contains(ExternalOpenFlags::READ) {
498            O_RDONLY
499        } else {
500            O_RDWR
501        };
502
503        if flags.contains(ExternalOpenFlags::CREATE) {
504            oflags |= O_CREAT;
505        }
506
507        if let Some(link_to) = link_to {
508            fs.link(path.as_ref().to_string_lossy().as_ref(),
509                at_ino,
510                objid_to_ino(link_to).ok_or(ErrorKind::InvalidInput)?,
511            ).inspect_err(|e| tracing::warn!("failed to link: {}", e))?;
512        }
513
514        let mut file = fs.open_file_from_container(
515            at_ino,
516            path.as_ref().to_string_lossy().as_ref(),
517            oflags,
518            mode,
519        )?;
520
521        Ok(ExternalFile::new(
522            path.as_ref().to_string_lossy().to_string(),
523            file.get_file_inode()?.kind().into(),
524            ino_to_objid(file.get_file_inode()?.num()),
525        ))
526    }
527
528    async fn unlink_external(&self, at: Option<ObjID>, path: impl AsRef<Path>) -> Result<()> {
529        if at.is_some() {
530            return Err(ErrorKind::Unsupported.into());
531        }
532        let mut fs = self.fs.lock().unwrap();
533        fs.remove_file(path.as_ref().to_string_lossy().as_ref())?;
534        return Ok(());
535    }
536
537    async fn readlink_external(&self, at: ObjID) -> Result<String> {
538        self.readlink(at).await
539    }
540
541    async fn readdir_external(
542        &self,
543        dir: ObjID,
544        skip: usize,
545        count: usize,
546        entries: &mut std::vec::Vec<ExternalFile>,
547    ) -> Result<()> {
548        entries.clear();
549        tracing::trace!(
550            "enumerating external namespace {:x} (skip {}, count {})",
551            dir,
552            skip,
553            count
554        );
555        let mut fs = self.fs.lock().unwrap();
556        let mut inonr = objid_to_ino(dir).ok_or(ErrorKind::InvalidInput)?;
557        if inonr == 0 {
558            inonr = 2;
559        }
560
561        let mut inode = fs.get_inode(inonr)?;
562        let diriter = fs.dirents(&mut inode)?;
563
564        let diriter = diriter.skip(skip).take(count).filter_map(|de| {
565            de.1.ok().map(|ino| {
566                ExternalFile::new(
567                    unsafe { str::from_utf8_unchecked(&de.0) },
568                    ino.kind().into(),
569                    ino_to_objid(ino.num()),
570                )
571            })
572        });
573
574        for entry in diriter {
575            tracing::trace!("record external file {} in namespace {:x} with ID {} and kind {:?}",
576                entry.name().unwrap_or("<invalid utf8>"),
577                dir,
578                entry.id,
579                entry.kind
580            );
581            entries.push(entry)
582        }
583        tracing::trace!("collected {} entries", entries.len());
584
585        Ok(())
586    }
587
588    async fn link_external(
589        &self,
590        _file: &ExternalFile,
591        _at: Option<ObjID>,
592        _path: impl AsRef<Path>,
593    ) -> Result<()> {
594        todo!()
595    }
596
597    async fn stat_external(&self, _path: impl AsRef<Path>) -> Result<libc::stat> {
598        todo!()
599    }
600
601    async fn fstat_external(&self, _file: Option<ObjID>) -> Result<libc::stat> {
602        todo!()
603    }
604
605    async fn symlink_external(
606        &self,
607        _at: Option<ObjID>,
608        _target: impl AsRef<Path>,
609        _linkpath: impl AsRef<Path>,
610    ) -> Result<()> {
611        todo!()
612    }
613}