1#[cfg(not(target_os = "twizzler"))]
2use std::io::Result;
3use std::{
4 collections::HashMap, ffi::CString, io::{ErrorKind, Read, Seek, SeekFrom, Write}, path::Path, sync::{
5 Mutex, MutexGuard, atomic::{AtomicU64, Ordering}
6 }, time::Instant
7};
8
9use libc::{PATH_MAX, mode_t};
10use lwext4_rs::{
11 Ext4Blockdev, Ext4BlockdevIface, Ext4File, Ext4Fs, MpLock, O_CREAT, O_RDONLY, O_RDWR,
12};
13use mayheap::Vec;
14use pager_dynamic::{ino_to_objid, objid_to_ino, ExternalFile};
15#[cfg(target_os = "twizzler")]
16use twizzler::Result;
17
18use crate::{
19 paged_object_store::MAYHEAP_LEN, DevicePage, ExternalFileStore, ExternalOpenFlags, ObjID,
20 PagedDevice, PagedObjectStore, PosIo, PAGE_SIZE,
21};
22
23pub struct Ext4Store<D: Device> {
24 fs: Mutex<Ext4Fs>,
25 ino_cache: Mutex<HashMap<ObjID, u32>>,
26 len_cache: Mutex<HashMap<ObjID, u64>>,
27 device: D,
28}
29
30pub trait Device: PosIo + PagedDevice + Sync + Send + Clone + 'static {}
31
32impl<T: PosIo + PagedDevice + Sync + Send + Clone + 'static> Device for T {}
33
34struct Ext4Bd<D: Device> {
35 device: D,
36 phys_bcount: u64,
37 lock: MpLock,
38}
39
40impl<D: Device> Ext4BlockdevIface for Ext4Bd<D> {
41 fn phys_block_size(&mut self) -> u32 {
42 PHYSICAL_BSIZE
43 }
44
45 fn phys_block_count(&mut self) -> u64 {
46 self.phys_bcount
47 }
48
49 fn open(&mut self) -> std::io::Result<()> {
50 Ok(())
51 }
52
53 fn close(&mut self) -> std::io::Result<()> {
54 Ok(())
55 }
56
57 fn read(&mut self, buf: *mut u8, block: u64, bcount: u32) -> std::io::Result<u32> {
58 let start = block * PHYSICAL_BSIZE as u64;
59 let len = bcount as u64 * PHYSICAL_BSIZE as u64;
60 let slice = unsafe { core::slice::from_raw_parts_mut(buf, len as usize) };
61 let len = self.device.run_async(self.device.read(start, slice))?;
62 Ok((len / PHYSICAL_BSIZE as usize) as u32)
63 }
64
65 fn write(&mut self, buf: *const u8, block: u64, bcount: u32) -> std::io::Result<u32> {
66 let start = block * PHYSICAL_BSIZE as u64;
67 let len = bcount as u64 * PHYSICAL_BSIZE as u64;
68 let slice = unsafe { core::slice::from_raw_parts(buf, len as usize) };
69 let len = self.device.run_async(self.device.write(start, slice))?;
70 Ok((len / PHYSICAL_BSIZE as usize) as u32)
71 }
72
73 fn lock(&self) -> std::io::Result<()> {
74 self.lock.lock();
75 Ok(())
76 }
77
78 fn unlock(&self) -> std::io::Result<()> {
79 self.lock.unlock();
80 Ok(())
81 }
82}
83
84impl<D: Device> Ext4Bd<D> {
85 fn new(device: D, _name: &str, phys_bcount: u64) -> Self {
86 Self {
87 device,
88 phys_bcount,
89 lock: MpLock::new(),
90 }
91 }
92}
93
94static BDEV_ID: AtomicU64 = AtomicU64::new(0);
95
96const LOGICAL_BSIZE: u32 = 512;
97const PHYSICAL_BSIZE: u32 = 512;
98
99impl<D: Device> Ext4Store<D> {
100 pub async fn new(device: D, name: &str) -> Result<Self> {
101 let bdname = format!("blockdev-{}", BDEV_ID.fetch_add(1, Ordering::SeqCst));
102 let max = device.len().await? as u64;
103 let bcount = max / LOGICAL_BSIZE as u64;
104 let phys_bcount = max / PHYSICAL_BSIZE as u64;
105 let bd = Ext4Blockdev::new(
106 Ext4Bd::new(device.clone(), bdname.as_str(), phys_bcount),
107 LOGICAL_BSIZE,
108 bcount,
109 name,
110 )?;
111
112 let mut fs = Ext4Fs::new(bd, CString::new(name).unwrap(), false)?;
113
114 match fs.create_dir("ids") {
115 Err(e) if e.kind() != ErrorKind::AlreadyExists => {
116 return Err(e.into());
117 }
118 _ => {}
119 }
120
121 Ok(Self {
122 fs: Mutex::new(fs),
123 device,
124 len_cache: Mutex::new(HashMap::default()),
125 ino_cache: Mutex::new(HashMap::default()),
126 })
127 }
128
129 fn get_len_from_cache(&self, id: ObjID) -> Option<u64> {
130 self.len_cache.lock().unwrap().get(&id).copied()
131 }
132
133 async fn readlink(&self, id: ObjID) -> Result<String> {
134 let mut buf = vec![0; PATH_MAX as usize];
135 let len = self.read_object(id, 0, &mut buf).await?;
136 buf.truncate(len);
137 String::from_utf8(buf).map_err(|_| ErrorKind::InvalidData.into())
138 }
139
140 fn invalidate_len(&self, id: ObjID) {
141 self.len_cache.lock().unwrap().remove(&id);
142 }
143
144 fn set_len_in_cache(&self, id: ObjID, len: u64) {
145 self.len_cache.lock().unwrap().insert(id, len);
146 }
147
148 pub fn get_id_path(&self, id: ObjID) -> (String, String) {
149 let top = id.to_be_bytes()[0];
150 let us = format!("ids/{:x}", top);
151 (us, format!("ids/{:x}/{:x}", top, id))
152 }
153
154 pub fn set_len(&self, id: ObjID, len: u64) -> Result<()> {
155 let mut fs = self.fs.lock().unwrap();
156 let mut file = self.get_object_as_file(&mut fs, id, false)?;
157 file.truncate(len)?;
158 self.set_len_in_cache(id, len);
159 Ok(())
160 }
161
162 pub fn lookup_ino_cache(&self, id: ObjID) -> Option<u32> {
163 self.ino_cache.lock().unwrap().get(&id).copied()
164 }
165
166 pub fn insert_ino_cache(&self, id: ObjID, ino: u32) {
167 self.ino_cache.lock().unwrap().insert(id, ino);
168 }
169
170 pub fn remove_ino_cache(&self, id: ObjID) {
171 self.ino_cache.lock().unwrap().remove(&id);
172 }
173
174 fn do_get_object_as_file<'a>(
175 &self,
176 fs: &'a mut MutexGuard<'_, Ext4Fs>,
177 id: ObjID,
178 create: bool,
179 ) -> Result<Ext4File<'a>> {
180 let flags = if create { O_RDWR | O_CREAT } else { O_RDWR };
181 if let Some(ino) = objid_to_ino(id) {
182 return Ok(fs.open_file_from_inode(ino, flags)?);
183 }
184 let path = self.get_id_path(id);
185 if create {
186 match fs.create_dir(&path.0) {
187 Ok(_) => {}
188 Err(e) if e.kind() == ErrorKind::AlreadyExists => {}
189 Err(e) => Err(e)?,
190 }
191 }
192 Ok(fs.open_file(&path.1, flags)?)
193 }
194
195 pub fn get_object_as_file<'a>(
196 &self,
197 fs: &'a mut MutexGuard<'_, Ext4Fs>,
198 id: ObjID,
199 create: bool,
200 ) -> Result<Ext4File<'a>> {
201 if let Some(ino) = self.lookup_ino_cache(id) {
202 return Ok(fs.open_file_from_inode(ino, O_RDWR)?);
203 }
204 let mut file = self.do_get_object_as_file(fs, id, create)?;
205 self.insert_ino_cache(id, file.get_file_inode()?.num());
206 Ok(file)
207 }
208}
209
210impl<D: Device> PagedObjectStore for Ext4Store<D> {
211 async fn create_object(&self, id: crate::ObjID) -> Result<()> {
212 let mut fs = self.fs.lock().unwrap();
213 self.get_object_as_file(&mut fs, id, true)?;
214 fs.flush()?;
215 Ok(())
216 }
217
218 async fn delete_object(&self, id: crate::ObjID) -> Result<()> {
219 let path = self.get_id_path(id);
220 let mut fs = self.fs.lock().unwrap();
221 fs.remove_file(&path.1)?;
222 self.remove_ino_cache(id);
223 self.invalidate_len(id);
224 fs.flush()?;
225 Ok(())
226 }
227
228 async fn len(&self, id: crate::ObjID) -> Result<u64> {
229 if let Some(len) = self.get_len_from_cache(id) {
230 return Ok(len);
231 }
232 let mut fs = self.fs.lock().unwrap();
233 let mut file = self.get_object_as_file(&mut fs, id, false)?;
234 self.set_len_in_cache(id, file.len());
235 Ok(file.len())
236 }
237
238 async fn read_object(&self, id: crate::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
239 let mut fs = self.fs.lock().unwrap();
240 let mut file = self.get_object_as_file(&mut fs, id, false)?;
241 file.seek(SeekFrom::Start(offset))?;
242 Ok(file.read(buf)?)
243 }
244
245 async fn write_object(&self, id: crate::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
246 let mut fs = self.fs.lock().unwrap();
247 let mut file = self.get_object_as_file(&mut fs, id, false)?;
248 if offset > file.len() {
249 file.ensure_backing(offset)
250 .inspect_err(|e| tracing::warn!("failed to ensure backing for object: {}", e))?;
251 file.truncate(offset).inspect_err(|e| {
252 tracing::warn!("failed to initialize object to {}: {}", offset, e)
253 })?;
254 self.invalidate_len(id);
255 }
256 file.seek(SeekFrom::Start(offset))?;
257 file.write(buf)?;
259 drop(file);
260 fs.flush()?;
261 Ok(())
262 }
263
264 async fn page_in_object<'a>(
265 &self,
266 id: ObjID,
267 reqs: &'a mut [crate::PageRequest],
268 ) -> Result<usize> {
269 let mut fs = self.fs.lock().unwrap();
270 let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
271 let mut file = self
272 .get_object_as_file(&mut fs, id, false)
273 .inspect_err(|e| tracing::error!("go err: {}", e))?;
274 let mut inode = file
275 .get_file_inode()
276 .inspect_err(|e| tracing::error!("gfi err: {}", e))?;
277 let max_len = inode.size();
278 tracing::trace!("paging in request for {} reqs", reqs.len());
279
280 let mut iters = 0;
281 drop(file);
282 drop(fs);
283 let mut blocks = reqs
284 .iter_mut()
285 .map(|req| {
286 let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
287
288 let mut page = req.start_page;
289 let end = req.start_page + req.nr_pages as i64;
290
291 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
292 if rem_blocks > 64
293 && page as usize * PAGE_SIZE >= (max_len as usize + PAGE_SIZE * 8)
294 {
295 let _ = disk_pages.push(DevicePage::Hole(rem_blocks));
296 } else {
297 let mut fs = self.fs.lock().unwrap();
298 while page < end {
299 iters += 1;
300 if iters % 100 == 0 {
301 drop(fs);
302 self.device.yield_now();
303 fs = self.fs.lock().unwrap();
304 }
305
306 let mut block = page as u32;
307 if objid_to_ino(id).is_some() && block > 0 {
308 block -= 1;
310 }
311 block = block * blocks_per_page as u32;
312 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
313
314 let item = match inode.get_data_blocks(block, rem_blocks, false) {
315 Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
316 if dblock == 0 {
317 DevicePage::Hole(nr_dblk)
318 } else {
319 DevicePage::Run(dblock, nr_dblk)
320 }
321 }
322 _ => match inode.get_data_block(block, false)? {
323 0 => DevicePage::Hole(1),
324 dpg => DevicePage::Run(dpg, 1),
325 },
326 };
327 page += item.nr_pages() as i64;
328 if let Some(prev) = disk_pages.last_mut() {
329 if !prev.try_extend(&item) {
330 disk_pages.push(item).unwrap();
331 }
332 } else {
333 disk_pages.push(item).unwrap();
334 }
335 }
336 }
337 Result::Ok((req, disk_pages))
338 })
339 .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
340 for br in blocks.iter_mut() {
341 let pages = &br.1[..];
342 tracing::trace!("paging in {:?}", pages);
343 let _len = br.0.page_in(pages, &self.device).await?;
344 }
345
346 Ok(reqs.len())
347 }
348
349 async fn page_out_object<'a>(
350 &self,
351 id: ObjID,
352 reqs: &'a mut [crate::PageRequest],
353 ) -> Result<usize> {
354 let end_offset = reqs
355 .iter()
356 .max_by_key(|req| req.start_page as u64 + req.nr_pages as u64)
357 .map(|end_req| {
358 (end_req.start_page as u64 + end_req.nr_pages as u64) * PAGE_SIZE as u64
359 });
360
361 let start = Instant::now();
362 let mut fs = self.fs.lock().unwrap();
363 let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
364 let mut file = self.get_object_as_file(&mut fs, id, false)?;
365 if end_offset.unwrap_or(0) >= file.len() {
366 drop(file);
367 drop(fs);
368 self.write_object(id, end_offset.unwrap_or(0), &[0u8; PAGE_SIZE])
369 .await?;
370 fs = self.fs.lock().unwrap();
371 } else {
372 drop(file);
373 }
374 let mut file = self.get_object_as_file(&mut fs, id, false)?;
375 let mut inode = file.get_file_inode()?;
376 drop(file);
377 drop(fs);
378 tracing::trace!("paging out {:x} request for {} reqs", id, reqs.len());
379
380 let setup_done = Instant::now();
381 let mut iters = 0;
382 let mut blocks = reqs
383 .iter_mut()
384 .map(|req| {
385 let mut fs = self.fs.lock().unwrap();
386 let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
387
388 let mut page = req.start_page;
389 let end = req.start_page + req.nr_pages as i64;
390 while page < end {
391 let mut block = page as u32;
392 tracing::trace!("paging out block {}", block);
393 if objid_to_ino(id).is_some() && block > 0 {
394 block -= 1;
396 }
397
398 iters += 1;
399 if iters % 100 == 0 {
400 drop(fs);
401 self.device.yield_now();
402 fs = self.fs.lock().unwrap();
403 }
404
405 block = block * blocks_per_page as u32;
406 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
407
408 let item = match inode.get_data_blocks(block, rem_blocks, true) {
409 Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
410 if dblock == 0 {
411 tracing::warn!(
412 "got unexpected zero block when paging out object {:x}",
413 id
414 );
415 Result::Err(ErrorKind::Other.into())?
416 } else {
417 DevicePage::Run(dblock, nr_dblk)
418 }
419 }
420 _ => match inode.get_data_block(block, true).inspect_err(|e| tracing::warn!("failed to get_data_block: {}", e))? {
421 0 => {
422 tracing::warn!(
423 "got unexpected zero block when paging out object {:x} in fallback",
424 id
425 );
426 Result::Err(ErrorKind::Other.into())?
427 }
428 dpg => DevicePage::Run(dpg, 1),
429 },
430 };
431 page += item.nr_pages() as i64;
432 if let Some(prev) = disk_pages.last_mut() {
433 if !prev.try_extend(&item) {
434 disk_pages.push(item).unwrap();
435 }
436 } else {
437 disk_pages.push(item).unwrap();
438 }
439 }
440 Result::Ok((req, disk_pages))
441 })
442 .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
443 tracing::trace!(
444 "found blocks for paging out in {}ms",
445 (Instant::now() - setup_done).as_millis()
446 );
447
448 let blocks_found = Instant::now();
449 for br in blocks.iter_mut() {
450 let pages = &br.1[..];
451 let _len = br.0.page_out(pages, &self.device).await?;
452 }
453 let mut fs = self.fs.lock().unwrap();
454 fs.flush()?;
455 let io_done = Instant::now();
456 tracing::trace!(
457 "==> {}ms {}ms {}ms",
458 (setup_done - start).as_millis(),
459 (blocks_found - setup_done).as_millis(),
460 (io_done - blocks_found).as_millis()
461 );
462 Ok(reqs.len())
463 }
464}
465
466impl<D: Device> ExternalFileStore for Ext4Store<D> {
467 async fn open_external(
468 &self,
469 at: Option<ObjID>,
470 path: impl AsRef<Path>,
471 flags: ExternalOpenFlags,
472 mode: mode_t,
473 link_to: Option<ObjID>,
474 ) -> Result<ExternalFile> {
475 let mut at_ino = if let Some(at) = at {
476 objid_to_ino(at).ok_or(ErrorKind::InvalidInput)?
477 } else {
478 2
479 };
480 if at_ino < 2 {
481 at_ino = 2;
482 }
483 tracing::trace!(
484 "opening external file at {:?} with flags {:?} and mode {:o} at ino {}, link_to = {:?}",
485 path.as_ref(),
486 flags,
487 mode, at_ino
488 ,link_to
489 );
490
491 let mut fs = self.fs.lock().unwrap();
492
493 let mut oflags = if flags.contains(ExternalOpenFlags::READ)
494 && flags.contains(ExternalOpenFlags::WRITE)
495 {
496 O_RDWR
497 } else if flags.contains(ExternalOpenFlags::READ) {
498 O_RDONLY
499 } else {
500 O_RDWR
501 };
502
503 if flags.contains(ExternalOpenFlags::CREATE) {
504 oflags |= O_CREAT;
505 }
506
507 if let Some(link_to) = link_to {
508 fs.link(path.as_ref().to_string_lossy().as_ref(),
509 at_ino,
510 objid_to_ino(link_to).ok_or(ErrorKind::InvalidInput)?,
511 ).inspect_err(|e| tracing::warn!("failed to link: {}", e))?;
512 }
513
514 let mut file = fs.open_file_from_container(
515 at_ino,
516 path.as_ref().to_string_lossy().as_ref(),
517 oflags,
518 mode,
519 )?;
520
521 Ok(ExternalFile::new(
522 path.as_ref().to_string_lossy().to_string(),
523 file.get_file_inode()?.kind().into(),
524 ino_to_objid(file.get_file_inode()?.num()),
525 ))
526 }
527
528 async fn unlink_external(&self, at: Option<ObjID>, path: impl AsRef<Path>) -> Result<()> {
529 if at.is_some() {
530 return Err(ErrorKind::Unsupported.into());
531 }
532 let mut fs = self.fs.lock().unwrap();
533 fs.remove_file(path.as_ref().to_string_lossy().as_ref())?;
534 return Ok(());
535 }
536
537 async fn readlink_external(&self, at: ObjID) -> Result<String> {
538 self.readlink(at).await
539 }
540
541 async fn readdir_external(
542 &self,
543 dir: ObjID,
544 skip: usize,
545 count: usize,
546 entries: &mut std::vec::Vec<ExternalFile>,
547 ) -> Result<()> {
548 entries.clear();
549 tracing::trace!(
550 "enumerating external namespace {:x} (skip {}, count {})",
551 dir,
552 skip,
553 count
554 );
555 let mut fs = self.fs.lock().unwrap();
556 let mut inonr = objid_to_ino(dir).ok_or(ErrorKind::InvalidInput)?;
557 if inonr == 0 {
558 inonr = 2;
559 }
560
561 let mut inode = fs.get_inode(inonr)?;
562 let diriter = fs.dirents(&mut inode)?;
563
564 let diriter = diriter.skip(skip).take(count).filter_map(|de| {
565 de.1.ok().map(|ino| {
566 ExternalFile::new(
567 unsafe { str::from_utf8_unchecked(&de.0) },
568 ino.kind().into(),
569 ino_to_objid(ino.num()),
570 )
571 })
572 });
573
574 for entry in diriter {
575 tracing::trace!("record external file {} in namespace {:x} with ID {} and kind {:?}",
576 entry.name().unwrap_or("<invalid utf8>"),
577 dir,
578 entry.id,
579 entry.kind
580 );
581 entries.push(entry)
582 }
583 tracing::trace!("collected {} entries", entries.len());
584
585 Ok(())
586 }
587
588 async fn link_external(
589 &self,
590 _file: &ExternalFile,
591 _at: Option<ObjID>,
592 _path: impl AsRef<Path>,
593 ) -> Result<()> {
594 todo!()
595 }
596
597 async fn stat_external(&self, _path: impl AsRef<Path>) -> Result<libc::stat> {
598 todo!()
599 }
600
601 async fn fstat_external(&self, _file: Option<ObjID>) -> Result<libc::stat> {
602 todo!()
603 }
604
605 async fn symlink_external(
606 &self,
607 _at: Option<ObjID>,
608 _target: impl AsRef<Path>,
609 _linkpath: impl AsRef<Path>,
610 ) -> Result<()> {
611 todo!()
612 }
613}