1#![feature(naked_functions)]
2#![feature(io_error_more)]
3#![feature(test)]
4
5use std::{
6 collections::HashMap,
7 sync::{Arc, Mutex, OnceLock},
8 time::Duration,
9};
10
11use async_executor::Executor;
12use async_io::{block_on, Timer};
13use disk::Disk;
14use memstore::virtio::init_virtio;
15use object_store::{Ext4Store, ExternalFile, PagedDevice, PagedObjectStore};
16use tracing_subscriber::fmt::format::FmtSpan;
17use twizzler::{
18 collections::vec::{VecObject, VecObjectAlloc},
19 object::{ObjID, Object, ObjectBuilder},
20 Result,
21};
22use twizzler_abi::pager::{
23 CompletionToKernel, CompletionToPager, PagerCompletionData, RequestFromKernel, RequestFromPager,
24};
25use twizzler_queue::{QueueBase, QueueSender};
26use twizzler_rt_abi::{error::TwzError, object::MapFlags};
27
28use crate::{data::PagerData, request_handle::handle_kernel_request};
29
30mod data;
31mod disk;
32mod handle;
33mod helpers;
34#[allow(unused)]
36mod memstore;
37mod nvme;
38mod physrw;
39mod request_handle;
40mod stats;
41
42pub use handle::{pager_close_handle, pager_open_handle};
43
44pub static EXECUTOR: OnceLock<Executor> = OnceLock::new();
45
46fn tracing_init() {
50 tracing::subscriber::set_global_default(
51 tracing_subscriber::fmt()
52 .with_max_level(tracing::Level::INFO)
53 .with_span_events(FmtSpan::ENTER)
54 .without_time()
55 .finish(),
56 )
57 .unwrap();
58 tracing_log::LogTracer::init().unwrap();
59}
60
61fn data_structure_init() -> PagerData {
65 let pager_data = PagerData::new();
66
67 return pager_data;
68}
69
70fn attach_queue<T: std::marker::Copy, U: std::marker::Copy, Q>(
74 obj_id: ObjID,
75 queue_constructor: impl FnOnce(twizzler_queue::Queue<T, U>) -> Q,
76) -> Result<Q> {
77 tracing::debug!("Pager Attaching Queue: {}", obj_id);
78
79 let object = unsafe {
80 Object::<QueueBase<T, U>>::map_unchecked(obj_id, MapFlags::READ | MapFlags::WRITE).unwrap()
81 };
82
83 tracing::debug!("queue mapped; constructing...");
84 let queue: twizzler_queue::Queue<T, U> = twizzler_queue::Queue::from(object.into_handle());
86 Ok(queue_constructor(queue))
87}
88
89fn queue_init(
90 q1: ObjID,
91 q2: ObjID,
92) -> (
93 twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>,
94 twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
95) {
96 let rq = attach_queue::<RequestFromKernel, CompletionToKernel, _>(
97 q1,
98 twizzler_queue::CallbackQueueReceiver::new,
99 )
100 .unwrap();
101 let sq = attach_queue::<RequestFromPager, CompletionToPager, _>(
102 q2,
103 twizzler_queue::QueueSender::new,
104 )
105 .unwrap();
106
107 return (rq, sq);
108}
109
110fn async_runtime_init(n: i32) -> &'static Executor<'static> {
115 let ex = EXECUTOR.get_or_init(|| Executor::new());
116
117 for _ in 0..n {
118 std::thread::spawn(|| block_on(ex.run(std::future::pending::<()>())));
119 }
120
121 return ex;
122}
123
124fn pager_init(
128 q1: ObjID,
129 q2: ObjID,
130) -> (
131 twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>,
132 twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
133 PagerData,
134 &'static Executor<'static>,
135) {
136 tracing_init();
137 let data = data_structure_init();
138 let ex = async_runtime_init(4);
139 let (rq, sq) = queue_init(q1, q2);
140
141 tracing::debug!("init complete");
142 return (rq, sq, data, ex);
143}
144
145fn spawn_queues(
146 ctx: &'static PagerContext,
147 kernel_rq: Arc<twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>>,
148 ex: &'static Executor<'static>,
149) {
150 tracing::debug!("spawning queues...");
151 ex.spawn(listen_queue(kernel_rq, ctx, handle_kernel_request, ex))
152 .detach();
153}
154
155async fn listen_queue<R, C, F, I>(
156 kernel_rq: Arc<twizzler_queue::CallbackQueueReceiver<R, C>>,
157 ctx: &'static PagerContext,
158 handler: impl Fn(&'static PagerContext, u32, R) -> F + Copy + Send + Sync + 'static,
159 _ex: &'static Executor<'static>,
160) where
161 F: std::future::Future<Output = I> + Send + 'static,
162 R: std::fmt::Debug + Copy + Send + Sync + 'static,
163 C: std::fmt::Debug + Copy + Send + Sync + 'static,
164 I: IntoIterator<Item = C> + Send + Sync + 'static,
165{
166 loop {
167 tracing::trace!("queue receiving...");
168 let (id, request) = kernel_rq.receive().await.unwrap();
169 tracing::trace!("got request: ({},{:?})", id, request);
170
171 let comp = handler(ctx, id, request).await;
172 for comp in comp {
173 notify(&kernel_rq, id, comp).await;
174 }
175 }
176}
177
178async fn notify<R, C>(q: &Arc<twizzler_queue::CallbackQueueReceiver<R, C>>, id: u32, res: C)
179where
180 R: std::fmt::Debug + Copy + Send + Sync,
181 C: std::fmt::Debug + Copy + Send + Sync + 'static,
182{
183 q.complete(id, res).await.unwrap();
184 }
186
187async fn report_ready(
188 ctx: &PagerContext,
189 _ex: &'static Executor<'static>,
190) -> Option<PagerCompletionData> {
191 tracing::debug!("sending ready signal to kernel");
192 let request = RequestFromPager::new(twizzler_abi::pager::PagerRequest::Ready);
193
194 match ctx.sender.submit_and_wait(request).await {
195 Ok(completion) => {
196 tracing::debug!("received completion for ready signal: {:?}", completion);
197 return Some(completion.data());
198 }
199 Err(e) => {
200 tracing::warn!("error from ready signal {:?}", e);
201 return None;
202 }
203 }
204}
205
206struct PagerContext {
207 data: PagerData,
208 sender: Arc<QueueSender<RequestFromPager, CompletionToPager>>,
209 kernel_notify:
210 Arc<twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>>,
211
212 stores: Mutex<Stores>,
215}
216
217struct Stores {
218 map: HashMap<ObjID, Arc<Store>>,
219 default: ObjID,
220}
221
222impl Stores {
223 pub fn paged_ostore(&self, id: Option<ObjID>) -> Result<Arc<Store>> {
224 match id {
225 Some(id) => Ok(self.map.get(&id).ok_or(TwzError::INVALID_ARGUMENT)?.clone()),
226 None => Ok(self
227 .map
228 .get(&self.default)
229 .ok_or(TwzError::INVALID_ARGUMENT)?
230 .clone()),
231 }
232 }
233
234 pub fn insert_device(
235 &mut self,
236 store: Arc<dyn PagedObjectStore + Send + Sync + 'static>,
237 dev: Arc<dyn PagedDevice + Send + Sync + 'static>,
238 ) {
239 self.map
240 .insert(ObjID::new(0), Arc::new(Store { inner: store, dev }));
241 }
242}
243
244#[allow(dead_code)]
245struct Store {
246 inner: Arc<dyn PagedObjectStore + Send + Sync + 'static>,
247 dev: Arc<dyn PagedDevice + Send + Sync + 'static>,
248}
249
250impl PagedObjectStore for Store {
251 fn create_object(&self, id: object_store::ObjID) -> Result<()> {
252 self.inner.create_object(id)
253 }
254
255 fn delete_object(&self, id: object_store::ObjID) -> Result<()> {
256 self.inner.delete_object(id)
257 }
258
259 fn len(&self, id: object_store::ObjID) -> Result<u64> {
260 self.inner.len(id)
261 }
262
263 fn read_object(&self, id: object_store::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
264 self.inner.read_object(id, offset, buf)
265 }
266
267 fn write_object(&self, id: object_store::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
268 self.inner.write_object(id, offset, buf)
269 }
270
271 fn get_config_id(&self) -> Result<object_store::ObjID> {
272 self.inner.get_config_id()
273 }
274
275 fn set_config_id(&self, id: object_store::ObjID) -> Result<()> {
276 self.inner.set_config_id(id)
277 }
278
279 fn flush(&self) -> Result<()> {
280 self.inner.flush()
281 }
282
283 fn page_in_object<'a>(
284 &self,
285 id: object_store::ObjID,
286 reqs: &'a mut [object_store::PageRequest],
287 ) -> Result<usize> {
288 self.inner.page_in_object(id, reqs)
289 }
290
291 fn page_out_object<'a>(
292 &self,
293 id: object_store::ObjID,
294 reqs: &'a mut [object_store::PageRequest],
295 ) -> Result<usize> {
296 self.inner.page_out_object(id, reqs)
297 }
298
299 fn enumerate_external(&self, _id: object_store::ObjID) -> Result<Vec<ExternalFile>> {
300 self.inner.enumerate_external(_id)
301 }
302
303 fn find_external(&self, _id: object_store::ObjID) -> Result<usize> {
304 self.inner.find_external(_id)
305 }
306}
307
308impl PagerContext {
309 pub fn paged_ostore(&self, id: Option<ObjID>) -> Result<Arc<Store>> {
310 self.stores.lock().unwrap().paged_ostore(id)
311 }
312
313 pub async fn enumerate_external(&'static self, id: ObjID) -> Result<Vec<ExternalFile>> {
314 blocking::unblock(move || {
315 Ok(self
316 .paged_ostore(None)?
317 .enumerate_external(id.raw())?
318 .iter()
319 .cloned()
320 .collect())
321 })
322 .await
323 }
324
325 pub async fn notify_kernel(&'static self, id: u32, comp: CompletionToKernel) {
326 notify(&self.kernel_notify, id, comp).await;
327 }
328}
329
330static PAGER_CTX: OnceLock<PagerContext> = OnceLock::new();
331
332fn do_pager_start(q1: ObjID, q2: ObjID) -> ObjID {
333 let (rq, sq, data, ex) = pager_init(q1, q2);
334 #[allow(unused_variables)]
335 let disk = block_on(ex.run(Disk::new(ex))).unwrap();
336
337 let sq = Arc::new(sq);
338 let rq = Arc::new(rq);
339 let _ = PAGER_CTX.set(PagerContext {
340 data,
341 sender: sq,
342 kernel_notify: rq.clone(),
343 stores: Mutex::new(Stores {
344 map: HashMap::new(),
345 default: ObjID::new(0),
346 }),
347 });
348 let ctx = PAGER_CTX.get().unwrap();
349
350 #[allow(unused_variables)]
351 let virtio_store = block_on(ex.run(async move { init_virtio().await })).unwrap();
352 let ext4_store = Ext4Store::new(disk.clone(), "/").unwrap();
353
354 ctx.stores
355 .lock()
356 .unwrap()
357 .insert_device(Arc::new(ext4_store), Arc::new(disk));
358
359 spawn_queues(ctx, rq, ex);
360
361 block_on(ex.run(async move {
362 let _ = report_ready(&ctx, ex).await.unwrap();
363 }));
364
365 tracing::info!("pager ready");
366
367 if false {
369 let _ = ex
370 .spawn(async {
371 let pager = PAGER_CTX.get().unwrap();
372 loop {
373 pager.data.print_stats();
374 pager.data.reset_stats();
375 Timer::after(Duration::from_millis(1000)).await;
376 }
377 })
378 .detach();
379 }
380
381 let bootstrap_id = ctx.paged_ostore(None).map_or(0u128, |po| {
382 po.get_config_id().unwrap_or_else(|_| {
383 tracing::info!("creating new naming object");
384 let vo =
385 VecObject::<u32, VecObjectAlloc>::new(ObjectBuilder::default().persist()).unwrap();
386 po.set_config_id(vo.object().id().raw()).unwrap();
387 vo.object().id().raw()
388 })
389 });
390 tracing::info!("found root namespace: {:x}", bootstrap_id);
391
392 return bootstrap_id.into();
393}
394
395#[secgate::secure_gate]
396pub fn pager_start(q1: ObjID, q2: ObjID) -> Result<ObjID> {
397 Ok(do_pager_start(q1, q2))
398}
399
400#[secgate::secure_gate]
401pub fn adv_lethe() -> Result<()> {
402 PAGER_CTX
403 .get()
404 .unwrap()
405 .paged_ostore(None)?
406 .flush()
407 .unwrap();
408 Ok(())
409}
410
411#[secgate::secure_gate]
412pub fn disk_len(id: ObjID) -> Result<u64> {
413 PAGER_CTX
414 .get()
415 .unwrap()
416 .paged_ostore(None)?
417 .len(id.raw())
418 .map_err(|_| TwzError::NOT_SUPPORTED)
420}