1#[cfg(not(target_os = "twizzler"))]
2use std::io::Result;
3use std::{
4 collections::HashMap,
5 ffi::CString,
6 io::{ErrorKind, Read, Seek, SeekFrom, Write},
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Mutex, MutexGuard,
10 },
11 time::Instant,
12};
13
14use efs::fs::ext2::inode::ROOT_DIRECTORY_INODE;
15use lwext4_rs::{
16 Ext4Blockdev, Ext4BlockdevIface, Ext4File, Ext4Fs, FileKind, MpLock, O_CREAT, O_RDWR,
17};
18use mayheap::Vec;
19#[cfg(target_os = "twizzler")]
20use twizzler::Result;
21
22use crate::{
23 ino_to_objid, objid_to_ino, paged_object_store::MAYHEAP_LEN, DevicePage, ExternalFile,
24 ExternalKind, ObjID, PagedDevice, PagedObjectStore, PosIo, PAGE_SIZE,
25};
26
27#[derive(Default)]
28struct ExtCache {
29 ids: HashMap<ObjID, (ExternalFile, usize)>,
30 names: HashMap<u32, HashMap<String, (ExternalFile, usize)>>,
31}
32
33#[allow(dead_code)]
34impl ExtCache {
35 pub fn fill_dir(&mut self, ino: u32, items: impl Iterator<Item = (ExternalFile, usize)>) {
36 let entry = self.names.entry(ino).or_default();
37 for item in items {
38 if let Some(name) = item.0.name() {
39 entry.insert(name.to_owned(), item.clone());
40 self.ids.insert(item.0.id.into(), item);
41 }
42 }
43 }
44
45 pub fn readdir(&self, ino: u32) -> Option<std::vec::Vec<ExternalFile>> {
46 let entry = self.names.get(&ino)?;
47 Some(entry.values().map(|e| e.0).collect())
48 }
49
50 pub fn reset_dir(&mut self, ino: u32) {
51 if let Some(mut map) = self.names.remove(&ino) {
52 for item in map.drain() {
53 self.ids.remove(&item.1 .0.id.into());
54 }
55 }
56 }
57
58 pub fn lookup(&self, ino: u32, name: &str) -> Option<(ExternalFile, usize)> {
59 let map = self.names.get(&ino)?;
60 map.get(name).copied()
61 }
62
63 pub fn get_by_id(&self, id: ObjID) -> Option<(ExternalFile, usize)> {
64 self.ids.get(&id).copied()
65 }
66}
67
68pub struct Ext4Store<D: Device> {
69 fs: Mutex<Ext4Fs>,
70 ext_cache: Mutex<ExtCache>,
71 len_cache: Mutex<HashMap<ObjID, u64>>,
72 device: D,
73}
74
75pub trait Device: PosIo + PagedDevice + Sync + Send + Clone + 'static {}
76
77impl<T: PosIo + PagedDevice + Sync + Send + Clone + 'static> Device for T {}
78
79struct Ext4Bd<D: Device> {
80 device: D,
81 phys_bcount: u64,
82 lock: MpLock,
83}
84
85impl<D: Device> Ext4BlockdevIface for Ext4Bd<D> {
86 fn phys_block_size(&mut self) -> u32 {
87 PHYSICAL_BSIZE
88 }
89
90 fn phys_block_count(&mut self) -> u64 {
91 self.phys_bcount
92 }
93
94 fn open(&mut self) -> std::io::Result<()> {
95 Ok(())
96 }
97
98 fn close(&mut self) -> std::io::Result<()> {
99 Ok(())
100 }
101
102 fn read(&mut self, buf: *mut u8, block: u64, bcount: u32) -> std::io::Result<u32> {
103 let start = block * PHYSICAL_BSIZE as u64;
104 let len = bcount as u64 * PHYSICAL_BSIZE as u64;
105 let slice = unsafe { core::slice::from_raw_parts_mut(buf, len as usize) };
106 let len = self.device.run_async(self.device.read(start, slice))?;
107 Ok((len / PHYSICAL_BSIZE as usize) as u32)
108 }
109
110 fn write(&mut self, buf: *const u8, block: u64, bcount: u32) -> std::io::Result<u32> {
111 let start = block * PHYSICAL_BSIZE as u64;
112 let len = bcount as u64 * PHYSICAL_BSIZE as u64;
113 let slice = unsafe { core::slice::from_raw_parts(buf, len as usize) };
114 let len = self.device.run_async(self.device.write(start, slice))?;
115 Ok((len / PHYSICAL_BSIZE as usize) as u32)
116 }
117
118 fn lock(&self) -> std::io::Result<()> {
119 self.lock.lock();
120 Ok(())
121 }
122
123 fn unlock(&self) -> std::io::Result<()> {
124 self.lock.unlock();
125 Ok(())
126 }
127}
128
129impl<D: Device> Ext4Bd<D> {
130 fn new(device: D, _name: &str, phys_bcount: u64) -> Self {
131 Self {
132 device,
133 phys_bcount,
134 lock: MpLock::new(),
135 }
136 }
137}
138
139impl From<FileKind> for ExternalKind {
140 fn from(value: FileKind) -> Self {
141 match value {
142 FileKind::Regular => ExternalKind::Regular,
143 FileKind::Directory => ExternalKind::Directory,
144 FileKind::Symlink => ExternalKind::SymLink,
145 FileKind::Other => ExternalKind::Other,
146 }
147 }
148}
149
150static BDEV_ID: AtomicU64 = AtomicU64::new(0);
151
152const LOGICAL_BSIZE: u32 = 512;
153const PHYSICAL_BSIZE: u32 = 512;
154
155impl<D: Device> Ext4Store<D> {
156 pub async fn new(device: D, name: &str) -> Result<Self> {
157 let bdname = format!("blockdev-{}", BDEV_ID.fetch_add(1, Ordering::SeqCst));
158 let max = device.len().await? as u64;
159 let bcount = max / LOGICAL_BSIZE as u64;
160 let phys_bcount = max / PHYSICAL_BSIZE as u64;
161 let bd = Ext4Blockdev::new(
162 Ext4Bd::new(device.clone(), bdname.as_str(), phys_bcount),
163 LOGICAL_BSIZE,
164 bcount,
165 name,
166 )?;
167
168 let mut fs = Ext4Fs::new(bd, CString::new(name).unwrap(), false)?;
169
170 match fs.create_dir("ids") {
171 Err(e) if e.kind() != ErrorKind::AlreadyExists => {
172 return Err(e.into());
173 }
174 _ => {}
175 }
176
177 Ok(Self {
178 fs: Mutex::new(fs),
179 device,
180 ext_cache: Mutex::new(ExtCache::default()),
181 len_cache: Mutex::new(HashMap::default()),
182 })
183 }
184
185 fn get_len_from_cache(&self, id: ObjID) -> Option<u64> {
186 self.len_cache.lock().unwrap().get(&id).copied()
187 }
188
189 fn invalidate_len(&self, id: ObjID) {
190 self.len_cache.lock().unwrap().remove(&id);
191 }
192
193 fn set_len_in_cache(&self, id: ObjID, len: u64) {
194 self.len_cache.lock().unwrap().insert(id, len);
195 }
196
197 pub fn get_id_path(&self, id: ObjID) -> (String, String) {
198 let top = id.to_be_bytes()[0];
199 let us = format!("ids/{:x}", top);
200 (us, format!("ids/{:x}/{:x}", top, id))
201 }
202
203 pub fn get_object_as_file<'a>(
204 &self,
205 fs: &'a mut MutexGuard<'_, Ext4Fs>,
206 id: ObjID,
207 create: bool,
208 ) -> Result<Ext4File<'a>> {
209 let flags = if create { O_RDWR | O_CREAT } else { O_RDWR };
210 if let Some(ino) = objid_to_ino(id) {
211 return Ok(fs.open_file_from_inode(ino, flags)?);
212 }
213 let path = self.get_id_path(id);
214 if create {
215 match fs.create_dir(&path.0) {
216 Ok(_) => {}
217 Err(e) if e.kind() == ErrorKind::AlreadyExists => {}
218 Err(e) => Err(e)?,
219 }
220 }
221 Ok(fs.open_file(&path.1, flags)?)
222 }
223}
224
225impl<D: Device> PagedObjectStore for Ext4Store<D> {
226 async fn create_object(&self, id: crate::ObjID) -> Result<()> {
227 let mut fs = self.fs.lock().unwrap();
228 self.get_object_as_file(&mut fs, id, true)?;
229 fs.flush()?;
230 Ok(())
231 }
232
233 async fn delete_object(&self, id: crate::ObjID) -> Result<()> {
234 let path = self.get_id_path(id);
235 let mut fs = self.fs.lock().unwrap();
236 fs.remove_file(&path.1)?;
237 fs.flush()?;
238 Ok(())
239 }
240
241 async fn len(&self, id: crate::ObjID) -> Result<u64> {
242 if let Some(len) = self.get_len_from_cache(id) {
243 return Ok(len);
244 }
245 let mut fs = self.fs.lock().unwrap();
246 let mut file = self.get_object_as_file(&mut fs, id, false)?;
247 self.set_len_in_cache(id, file.len());
248 Ok(file.len())
249 }
250
251 async fn read_object(&self, id: crate::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
252 let mut fs = self.fs.lock().unwrap();
253 let mut file = self.get_object_as_file(&mut fs, id, false)?;
254 file.seek(SeekFrom::Start(offset))?;
255 Ok(file.read(buf)?)
256 }
257
258 async fn write_object(&self, id: crate::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
259 let mut fs = self.fs.lock().unwrap();
260 let mut file = self.get_object_as_file(&mut fs, id, false)?;
261 if offset > file.len() {
262 file.ensure_backing(offset)
263 .inspect_err(|e| tracing::warn!("failed to ensure backing for object: {}", e))?;
264 file.truncate(offset).inspect_err(|e| {
265 tracing::warn!("failed to initialize object to {}: {}", offset, e)
266 })?;
267 self.invalidate_len(id);
268 }
269 file.seek(SeekFrom::Start(offset))?;
270 file.write(buf)?;
272 drop(file);
273 fs.flush()?;
274 Ok(())
275 }
276
277 async fn page_in_object<'a>(
278 &self,
279 id: ObjID,
280 reqs: &'a mut [crate::PageRequest],
281 ) -> Result<usize> {
282 let mut fs = self.fs.lock().unwrap();
283 let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
284 let mut file = self
285 .get_object_as_file(&mut fs, id, false)
286 .inspect_err(|e| tracing::error!("go err: {}", e))?;
287 let mut inode = file
288 .get_file_inode()
289 .inspect_err(|e| tracing::error!("gfi err: {}", e))?;
290 let max_len = inode.size();
291 tracing::trace!("paging in request for {} reqs", reqs.len());
292
293 let mut iters = 0;
294 drop(file);
295 drop(fs);
296 let mut blocks = reqs
297 .iter_mut()
298 .map(|req| {
299 let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
300
301 let mut page = req.start_page;
302 let end = req.start_page + req.nr_pages as i64;
303
304 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
305 if rem_blocks > 64
306 && page as usize * PAGE_SIZE >= (max_len as usize + PAGE_SIZE * 8)
307 {
308 let _ = disk_pages.push(DevicePage::Hole(rem_blocks));
309 } else {
310 let mut fs = self.fs.lock().unwrap();
311 while page < end {
312 iters += 1;
313 if iters % 100 == 0 {
314 drop(fs);
315 self.device.yield_now();
316 fs = self.fs.lock().unwrap();
317 }
318
319 let mut block = page as u32;
320 if objid_to_ino(id).is_some() {
321 block -= 1;
323 }
324 block = block * blocks_per_page as u32;
325 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
326
327 let item = match inode.get_data_blocks(block, rem_blocks, false) {
328 Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
329 if dblock == 0 {
330 DevicePage::Hole(nr_dblk)
331 } else {
332 DevicePage::Run(dblock, nr_dblk)
333 }
334 }
335 _ => match inode.get_data_block(block, false)? {
336 0 => DevicePage::Hole(1),
337 dpg => DevicePage::Run(dpg, 1),
338 },
339 };
340 page += item.nr_pages() as i64;
341 if let Some(prev) = disk_pages.last_mut() {
342 if !prev.try_extend(&item) {
343 disk_pages.push(item).unwrap();
344 }
345 } else {
346 disk_pages.push(item).unwrap();
347 }
348 }
349 }
350 Result::Ok((req, disk_pages))
351 })
352 .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
353 for br in blocks.iter_mut() {
354 let pages = &br.1[..];
355 tracing::trace!("paging in {:?}", pages);
356 let _len = br.0.page_in(pages, &self.device).await?;
357 }
358
359 Ok(reqs.len())
360 }
361
362 async fn page_out_object<'a>(
363 &self,
364 id: ObjID,
365 reqs: &'a mut [crate::PageRequest],
366 ) -> Result<usize> {
367 let end_offset = reqs
368 .iter()
369 .max_by_key(|req| req.start_page as u64 + req.nr_pages as u64)
370 .map(|end_req| {
371 (end_req.start_page as u64 + end_req.nr_pages as u64) * PAGE_SIZE as u64
372 });
373
374 let start = Instant::now();
375 let mut fs = self.fs.lock().unwrap();
376 let blocks_per_page = PAGE_SIZE / fs.block_size()? as usize;
377 let mut file = self.get_object_as_file(&mut fs, id, false)?;
378 if end_offset.unwrap_or(0) >= file.len() {
379 drop(file);
380 drop(fs);
381 self.write_object(id, end_offset.unwrap_or(0), &[0u8; PAGE_SIZE])
382 .await?;
383 fs = self.fs.lock().unwrap();
384 } else {
385 drop(file);
386 }
387 let mut file = self.get_object_as_file(&mut fs, id, false)?;
388 let mut inode = file.get_file_inode()?;
389 drop(file);
390 drop(fs);
391 tracing::trace!("paging out request for {} reqs", reqs.len());
392
393 let setup_done = Instant::now();
394 let mut iters = 0;
395 let mut blocks = reqs
396 .iter_mut()
397 .map(|req| {
398 let mut fs = self.fs.lock().unwrap();
399 let mut disk_pages = Vec::<DevicePage, MAYHEAP_LEN>::new();
400
401 let mut page = req.start_page;
402 let end = req.start_page + req.nr_pages as i64;
403 while page < end {
404 let mut block = page as u32;
405 if objid_to_ino(id).is_some() {
406 block -= 1;
408 }
409
410 iters += 1;
411 if iters % 100 == 0 {
412 drop(fs);
413 self.device.yield_now();
414 fs = self.fs.lock().unwrap();
415 }
416
417 block = block * blocks_per_page as u32;
418 let rem_blocks = (end - page) as u32 * blocks_per_page as u32;
419
420 let item = match inode.get_data_blocks(block, rem_blocks, true) {
421 Ok((dblock, nr_dblk)) if nr_dblk > 0 => {
422 if dblock == 0 {
423 Result::Err(ErrorKind::Other.into())?
424 } else {
425 DevicePage::Run(dblock, nr_dblk)
426 }
427 }
428 _ => match inode.get_data_block(block, true)? {
429 0 => Result::Err(ErrorKind::Other.into())?,
430 dpg => DevicePage::Run(dpg, 1),
431 },
432 };
433 page += item.nr_pages() as i64;
434 if let Some(prev) = disk_pages.last_mut() {
435 if !prev.try_extend(&item) {
436 disk_pages.push(item).unwrap();
437 }
438 } else {
439 disk_pages.push(item).unwrap();
440 }
441 }
442 Result::Ok((req, disk_pages))
443 })
444 .try_collect::<Vec<_, MAYHEAP_LEN>>()?;
445
446 let blocks_found = Instant::now();
447 for br in blocks.iter_mut() {
448 let pages = &br.1[..];
449 let _len = br.0.page_out(pages, &self.device).await?;
450 }
451 let mut fs = self.fs.lock().unwrap();
452 fs.flush()?;
453 let io_done = Instant::now();
454 tracing::trace!(
455 "==> {}ms {}ms {}ms",
456 (setup_done - start).as_millis(),
457 (blocks_found - setup_done).as_millis(),
458 (io_done - blocks_found).as_millis()
459 );
460 Ok(reqs.len())
461 }
462
463 async fn enumerate_external(&self, id: ObjID) -> Result<std::vec::Vec<ExternalFile>> {
464 let mut fs = self.fs.lock().unwrap();
465 let mut inonr = objid_to_ino(id).ok_or(ErrorKind::InvalidInput)?;
466 if inonr == 0 {
467 inonr = ROOT_DIRECTORY_INODE;
468 }
469
470 if let Some(r) = self.ext_cache.lock().unwrap().readdir(inonr) {
471 return Ok(r);
472 }
473
474 let mut inode = fs.get_inode(inonr)?;
475 let diriter = fs.dirents(&mut inode)?;
476
477 let diriter = diriter.filter_map(|de| {
478 de.1.ok().map(|ino| {
479 (
480 ExternalFile::new(&de.0, ino.kind().into(), ino_to_objid(ino.num())),
481 ino.size() as usize,
482 )
483 })
484 });
485 self.ext_cache.lock().unwrap().reset_dir(inonr);
486 self.ext_cache.lock().unwrap().fill_dir(inonr, diriter);
487 if let Some(r) = self.ext_cache.lock().unwrap().readdir(inonr) {
488 Ok(r)
489 } else {
490 Err(ErrorKind::Other.into())
491 }
492 }
493
494 async fn find_external(&self, id: ObjID) -> Result<usize> {
495 let mut fs = self.fs.lock().unwrap();
496 let mut inonr = objid_to_ino(id).ok_or(ErrorKind::InvalidInput)?;
497 if inonr == 0 {
498 inonr = ROOT_DIRECTORY_INODE;
499 }
500 if let Some(info) = self.ext_cache.lock().unwrap().get_by_id(id) {
501 return Ok(info.1);
502 }
503 let inode = fs.get_inode(inonr)?;
504 Ok(inode.size() as usize)
505 }
506}