object_store/
ext4.rs

1#[cfg(not(target_os = "twizzler"))]
2use std::io::Result;
3use std::{
4    collections::HashMap,
5    ffi::CString,
6    io::{ErrorKind, Read, Seek, SeekFrom, Write},
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Mutex, MutexGuard,
10    },
11    time::Instant,
12};
13
14use efs::fs::ext2::inode::ROOT_DIRECTORY_INODE;
15use lwext4_rs::{
16    Ext4Blockdev, Ext4BlockdevIface, Ext4File, Ext4Fs, FileKind, MpLock, O_CREAT, O_RDWR,
17};
18use mayheap::Vec;
19#[cfg(target_os = "twizzler")]
20use twizzler::Result;
21
22use crate::{
23    ino_to_objid, objid_to_ino, paged_object_store::MAYHEAP_LEN, DevicePage, ExternalFile,
24    ExternalKind, ObjID, PagedDevice, PagedObjectStore, PosIo, PAGE_SIZE,
25};
26
27#[derive(Default)]
28struct ExtCache {
29    ids: HashMap<ObjID, (ExternalFile, usize)>,
30    names: HashMap<u32, HashMap<String, (ExternalFile, usize)>>,
31}
32
33#[allow(dead_code)]
34impl ExtCache {
35    pub fn fill_dir(&mut self, ino: u32, items: impl Iterator<Item = (ExternalFile, usize)>) {
36        let entry = self.names.entry(ino).or_default();
37        for item in items {
38            if let Some(name) = item.0.name() {
39                entry.insert(name.to_owned(), item.clone());
40                self.ids.insert(item.0.id.into(), item);
41            }
42        }
43    }
44
45    pub fn readdir(&self, ino: u32) -> Option<std::vec::Vec<ExternalFile>> {
46        let entry = self.names.get(&ino)?;
47        Some(entry.values().map(|e| e.0).collect())
48    }
49
50    pub fn reset_dir(&mut self, ino: u32) {
51        if let Some(mut map) = self.names.remove(&ino) {
52            for item in map.drain() {
53                self.ids.remove(&item.1 .0.id.into());
54            }
55        }
56    }
57
58    pub fn lookup(&self, ino: u32, name: &str) -> Option<(ExternalFile, usize)> {
59        let map = self.names.get(&ino)?;
60        map.get(name).copied()
61    }
62
63    pub fn get_by_id(&self, id: ObjID) -> Option<(ExternalFile, usize)> {
64        self.ids.get(&id).copied()
65    }
66}
67
68pub struct Ext4Store<D: Device> {
69    fs: Mutex<Ext4Fs>,
70    ext_cache: Mutex<ExtCache>,
71    len_cache: Mutex<HashMap<ObjID, u64>>,
72    device: D,
73}
74
75pub trait Device: PosIo + PagedDevice + Sync + Send + Clone + 'static {}
76
77impl<T: PosIo + PagedDevice + Sync + Send + Clone + 'static> Device for T {}
78
79struct Ext4Bd<D: Device> {
80    device: D,
81    phys_bcount: u64,
82    lock: MpLock,
83}
84
85impl<D: Device> Ext4BlockdevIface for Ext4Bd<D> {
86    fn phys_block_size(&mut self) -> u32 {
87        PHYSICAL_BSIZE
88    }
89
90    fn phys_block_count(&mut self) -> u64 {
91        self.phys_bcount
92    }
93
94    fn open(&mut self) -> std::io::Result<()> {
95        Ok(())
96    }
97
98    fn close(&mut self) -> std::io::Result<()> {
99        Ok(())
100    }
101
102    fn read(&mut self, buf: *mut u8, block: u64, bcount: u32) -> std::io::Result<u32> {
103        let start = block * PHYSICAL_BSIZE as u64;
104        let len = bcount as u64 * PHYSICAL_BSIZE as u64;
105        let slice = unsafe { core::slice::from_raw_parts_mut(buf, len as usize) };
106        let len = self.device.run_async(self.device.read(start, slice))?;
107        Ok((len / PHYSICAL_BSIZE as usize) as u32)
108    }
109
110    fn write(&mut self, buf: *const u8, block: u64, bcount: u32) -> std::io::Result<u32> {
111        let start = block * PHYSICAL_BSIZE as u64;
112        let len = bcount as u64 * PHYSICAL_BSIZE as u64;
113        let slice = unsafe { core::slice::from_raw_parts(buf, len as usize) };
114        let len = self.device.run_async(self.device.write(start, slice))?;
115        Ok((len / PHYSICAL_BSIZE as usize) as u32)
116    }
117
118    fn lock(&self) -> std::io::Result<()> {
119        self.lock.lock();
120        Ok(())
121    }
122
123    fn unlock(&self) -> std::io::Result<()> {
124        self.lock.unlock();
125        Ok(())
126    }
127}
128
129impl<D: Device> Ext4Bd<D> {
130    fn new(device: D, _name: &str, phys_bcount: u64) -> Self {
131        Self {
132            device,
133            phys_bcount,
134            lock: MpLock::new(),
135        }
136    }
137}
138
139impl From<FileKind> for ExternalKind {
140    fn from(value: FileKind) -> Self {
141        match value {
142            FileKind::Regular => ExternalKind::Regular,
143            FileKind::Directory => ExternalKind::Directory,
144            FileKind::Symlink => ExternalKind::SymLink,
145            FileKind::Other => ExternalKind::Other,
146        }
147    }
148}
149
150static BDEV_ID: AtomicU64 = AtomicU64::new(0);
151
152const LOGICAL_BSIZE: u32 = 512;
153const PHYSICAL_BSIZE: u32 = 512;
154
155impl<D: Device> Ext4Store<D> {
156    pub async fn new(device: D, name: &str) -> Result<Self> {
157        let bdname = format!("blockdev-{}", BDEV_ID.fetch_add(1, Ordering::SeqCst));
158        let max = device.len().await? as u64;
159        let bcount = max / LOGICAL_BSIZE as u64;
160        let phys_bcount = max / PHYSICAL_BSIZE as u64;
161        let bd = Ext4Blockdev::new(
162            Ext4Bd::new(device.clone(), bdname.as_str(), phys_bcount),
163            LOGICAL_BSIZE,
164            bcount,
165            name,
166        )?;
167
168        let mut fs = Ext4Fs::new(bd, CString::new(name).unwrap(), false)?;
169
170        match fs.create_dir("ids") {
171            Err(e) if e.kind() != ErrorKind::AlreadyExists => {
172                return Err(e.into());
173            }
174            _ => {}
175        }
176
177        Ok(Self {
178            fs: Mutex::new(fs),
179            device,
180            ext_cache: Mutex::new(ExtCache::default()),
181            len_cache: Mutex::new(HashMap::default()),
182        })
183    }
184
185    fn get_len_from_cache(&self, id: ObjID) -> Option<u64> {
186        self.len_cache.lock().unwrap().get(&id).copied()
187    }
188
189    fn invalidate_len(&self, id: ObjID) {
190        self.len_cache.lock().unwrap().remove(&id);
191    }
192
193    fn set_len_in_cache(&self, id: ObjID, len: u64) {
194        self.len_cache.lock().unwrap().insert(id, len);
195    }
196
197    pub fn get_id_path(&self, id: ObjID) -> (String, String) {
198        let top = id.to_be_bytes()[0];
199        let us = format!("ids/{:x}", top);
200        (us, format!("ids/{:x}/{:x}", top, id))
201    }
202
203    pub fn get_object_as_file<'a>(
204        &self,
205        fs: &'a mut MutexGuard<'_, Ext4Fs>,
206        id: ObjID,
207        create: bool,
208    ) -> Result<Ext4File<'a>> {
209        let flags = if create { O_RDWR | O_CREAT } else { O_RDWR };
210        if let Some(ino) = objid_to_ino(id) {
211            return Ok(fs.open_file_from_inode(ino, flags)?);
212        }
213        let path = self.get_id_path(id);
214        if create {
215            match fs.create_dir(&path.0) {
216                Ok(_) => {}
217                Err(e) if e.kind() == ErrorKind::AlreadyExists => {}
218                Err(e) => Err(e)?,
219            }
220        }
221        Ok(fs.open_file(&path.1, flags)?)
222    }
223}
224
225impl<D: Device> PagedObjectStore for Ext4Store<D> {
226    async fn create_object(&self, id: crate::ObjID) -> Result<()> {
227        let mut fs = self.fs.lock().unwrap();
228        self.get_object_as_file(&mut fs, id, true)?;
229        fs.flush()?;
230        Ok(())
231    }
232
233    async fn delete_object(&self, id: crate::ObjID) -> Result<()> {
234        let path = self.get_id_path(id);
235        let mut fs = self.fs.lock().unwrap();
236        fs.remove_file(&path.1)?;
237        fs.flush()?;
238        Ok(())
239    }
240
241    async fn len(&self, id: crate::ObjID) -> Result<u64> {
242        if let Some(len) = self.get_len_from_cache(id) {
243            return Ok(len);
244        }
245        let mut fs = self.fs.lock().unwrap();
246        let mut file = self.get_object_as_file(&mut fs, id, false)?;
247        self.set_len_in_cache(id, file.len());
248        Ok(file.len())
249    }
250
251    async fn read_object(&self, id: crate::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
252        let mut fs = self.fs.lock().unwrap();
253        let mut file = self.get_object_as_file(&mut fs, id, false)?;
254        file.seek(SeekFrom::Start(offset))?;
255        Ok(file.read(buf)?)
256    }
257
258    async fn write_object(&self, id: crate::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
259        let mut fs = self.fs.lock().unwrap();
260        let mut file = self.get_object_as_file(&mut fs, id, false)?;
261        if offset > file.len() {
262            file.ensure_backing(offset)
263                .inspect_err(|e| tracing::warn!("failed to ensure backing for object: {}", e))?;
264            file.truncate(offset).inspect_err(|e| {
265                tracing::warn!("failed to initialize object to {}: {}", offset, e)
266            })?;
267            self.invalidate_len(id);
268        }
269        file.seek(SeekFrom::Start(offset))?;
270        // TODO
271        file.write(buf)?;
272        drop(file);
273        fs.flush()?;
274        Ok(())
275    }
276
277    async fn page_in_object<'a>(
278        &self,
279        id: ObjID,
280        reqs: &'a mut [crate::PageRequest],
281    ) -> Result<usize> {
282        let mut fs = self.fs.lock().unwrap();
283        let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
284        let mut file = self
285            .get_object_as_file(&mut fs, id, false)
286            .inspect_err(|e| tracing::error!("go err: {}", e))?;
287        let mut inode = file
288            .get_file_inode()
289            .inspect_err(|e| tracing::error!("gfi err: {}", e))?;
290        let max_len = inode.size();
291        tracing::trace!("paging  in request for {} reqs", reqs.len());
292
293        let mut iters = 0;
294        drop(file);
295        drop(fs);
296        let mut blocks = reqs
297            .iter_mut()
298            .map(|req| {
299                let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
300
301                let mut page = req.start_page;
302                let end = req.start_page + req.nr_pages as i64;
303
304                let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
305                if rem_blocks > 64
306                    && page as usize * PAGE_SIZE >= (max_len as usize + PAGE_SIZE * 8)
307                {
308                    let _ = disk_pages.push(DevicePage::Hole(rem_blocks));
309                } else {
310                    let mut fs = self.fs.lock().unwrap();
311                    while page < end {
312                        iters += 1;
313                        if iters % 100 == 0 {
314                            drop(fs);
315                            self.device.yield_now();
316                            fs = self.fs.lock().unwrap();
317                        }
318
319                        let mut block = page as u32;
320                        if objid_to_ino(id).is_some() {
321                            // External files don't have null pages
322                            block -= 1;
323                        }
324                        block = block * blocks_per_page as u32;
325                        let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
326
327                        let item = match inode.get_data_blocks(block, rem_blocks, false) {
328                            Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
329                                if dblock == 0 {
330                                    DevicePage::Hole(nr_dblk)
331                                } else {
332                                    DevicePage::Run(dblock, nr_dblk)
333                                }
334                            }
335                            _ => match inode.get_data_block(block, false)? {
336                                0 => DevicePage::Hole(1),
337                                dpg => DevicePage::Run(dpg, 1),
338                            },
339                        };
340                        page += item.nr_pages() as i64;
341                        if let Some(prev) = disk_pages.last_mut() {
342                            if !prev.try_extend(&item) {
343                                disk_pages.push(item).unwrap();
344                            }
345                        } else {
346                            disk_pages.push(item).unwrap();
347                        }
348                    }
349                }
350                Result::Ok((req, disk_pages))
351            })
352            .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
353        for br in blocks.iter_mut() {
354            let pages = &br.1[..];
355            tracing::trace!("paging in {:?}", pages);
356            let _len = br.0.page_in(pages, &self.device).await?;
357        }
358
359        Ok(reqs.len())
360    }
361
362    async fn page_out_object<'a>(
363        &self,
364        id: ObjID,
365        reqs: &'a mut [crate::PageRequest],
366    ) -> Result<usize> {
367        let end_offset = reqs
368            .iter()
369            .max_by_key(|req| req.start_page as u64 + req.nr_pages as u64)
370            .map(|end_req| {
371                (end_req.start_page as u64 + end_req.nr_pages as u64) * PAGE_SIZE as u64
372            });
373
374        let start = Instant::now();
375        let mut fs = self.fs.lock().unwrap();
376        let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
377        let mut file = self.get_object_as_file(&mut fs, id, false)?;
378        if end_offset.unwrap_or(0) >= file.len() {
379            drop(file);
380            drop(fs);
381            self.write_object(id, end_offset.unwrap_or(0), &[0u8; PAGE_SIZE])
382                .await?;
383            fs = self.fs.lock().unwrap();
384        } else {
385            drop(file);
386        }
387        let mut file = self.get_object_as_file(&mut fs, id, false)?;
388        let mut inode = file.get_file_inode()?;
389        drop(file);
390        drop(fs);
391        tracing::trace!("paging out request for {} reqs", reqs.len());
392
393        let setup_done = Instant::now();
394        let mut iters = 0;
395        let mut blocks = reqs
396            .iter_mut()
397            .map(|req| {
398                let mut fs = self.fs.lock().unwrap();
399                let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
400
401                let mut page = req.start_page;
402                let end = req.start_page + req.nr_pages as i64;
403                while page < end {
404                    let mut block = page as u32;
405                    if objid_to_ino(id).is_some() {
406                        // External files don't have null pages
407                        block -= 1;
408                    }
409
410                    iters += 1;
411                    if iters % 100 == 0 {
412                        drop(fs);
413                        self.device.yield_now();
414                        fs = self.fs.lock().unwrap();
415                    }
416
417                    block = block * blocks_per_page as u32;
418                    let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
419
420                    let item = match inode.get_data_blocks(block, rem_blocks, true) {
421                        Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
422                            if dblock == 0 {
423                                Result::Err(ErrorKind::Other.into())?
424                            } else {
425                                DevicePage::Run(dblock, nr_dblk)
426                            }
427                        }
428                        _ => match inode.get_data_block(block, true)? {
429                            0 => Result::Err(ErrorKind::Other.into())?,
430                            dpg => DevicePage::Run(dpg, 1),
431                        },
432                    };
433                    page += item.nr_pages() as i64;
434                    if let Some(prev) = disk_pages.last_mut() {
435                        if !prev.try_extend(&item) {
436                            disk_pages.push(item).unwrap();
437                        }
438                    } else {
439                        disk_pages.push(item).unwrap();
440                    }
441                }
442                Result::Ok((req, disk_pages))
443            })
444            .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
445
446        let blocks_found = Instant::now();
447        for br in blocks.iter_mut() {
448            let pages = &br.1[..];
449            let _len = br.0.page_out(pages, &self.device).await?;
450        }
451        let mut fs = self.fs.lock().unwrap();
452        fs.flush()?;
453        let io_done = Instant::now();
454        tracing::trace!(
455            "==> {}ms {}ms {}ms",
456            (setup_done - start).as_millis(),
457            (blocks_found - setup_done).as_millis(),
458            (io_done - blocks_found).as_millis()
459        );
460        Ok(reqs.len())
461    }
462
463    async fn enumerate_external(&self, id: ObjID) -> Result<std::vec::Vec<ExternalFile>> {
464        let mut fs = self.fs.lock().unwrap();
465        let mut inonr = objid_to_ino(id).ok_or(ErrorKind::InvalidInput)?;
466        if inonr == 0 {
467            inonr = ROOT_DIRECTORY_INODE;
468        }
469
470        if let Some(r) = self.ext_cache.lock().unwrap().readdir(inonr) {
471            return Ok(r);
472        }
473
474        let mut inode = fs.get_inode(inonr)?;
475        let diriter = fs.dirents(&mut inode)?;
476
477        let diriter = diriter.filter_map(|de| {
478            de.1.ok().map(|ino| {
479                (
480                    ExternalFile::new(&de.0, ino.kind().into(), ino_to_objid(ino.num())),
481                    ino.size() as usize,
482                )
483            })
484        });
485        self.ext_cache.lock().unwrap().reset_dir(inonr);
486        self.ext_cache.lock().unwrap().fill_dir(inonr, diriter);
487        if let Some(r) = self.ext_cache.lock().unwrap().readdir(inonr) {
488            Ok(r)
489        } else {
490            Err(ErrorKind::Other.into())
491        }
492    }
493
494    async fn find_external(&self, id: ObjID) -> Result<usize> {
495        let mut fs = self.fs.lock().unwrap();
496        let mut inonr = objid_to_ino(id).ok_or(ErrorKind::InvalidInput)?;
497        if inonr == 0 {
498            inonr = ROOT_DIRECTORY_INODE;
499        }
500        if let Some(info) = self.ext_cache.lock().unwrap().get_by_id(id) {
501            return Ok(info.1);
502        }
503        let inode = fs.get_inode(inonr)?;
504        Ok(inode.size() as usize)
505    }
506}