pager_srv/
lib.rs

1#![feature(naked_functions)]
2#![feature(io_error_more)]
3#![feature(test)]
4#![feature(thread_local)]
5
6use std::{
7    sync::{Arc, OnceLock},
8    time::Duration,
9};
10
11use async_io::Timer;
12use disk::Disk;
13use memstore::virtio::init_virtio;
14use object_store::{Ext4Store, ExternalFile, PagedObjectStore};
15use physrw::{init_pr_mgr, report_ready};
16use threads::{run_async, spawn_async, PagerThreadPool};
17use tracing_subscriber::fmt::format::FmtSpan;
18use twizzler::{
19    collections::vec::{VecObject, VecObjectAlloc},
20    object::{ObjID, Object, ObjectBuilder},
21    Result,
22};
23use twizzler_abi::pager::{
24    CompletionToKernel, CompletionToPager, RequestFromKernel, RequestFromPager,
25};
26use twizzler_queue::{QueueBase, SubmissionFlags};
27use twizzler_rt_abi::{error::TwzError, object::MapFlags};
28
29use crate::data::PagerData;
30
31mod data;
32mod disk;
33mod handle;
34mod helpers;
35// in-progress
36#[allow(unused)]
37mod memstore;
38mod nvme;
39mod physrw;
40mod request_handle;
41mod stats;
42mod threads;
43
44pub use handle::{pager_close_handle, pager_open_handle};
45
46/***
47 * Tracing Init
48 */
49fn tracing_init() {
50    tracing::subscriber::set_global_default(
51        tracing_subscriber::fmt()
52            .with_max_level(tracing::Level::INFO)
53            .with_span_events(FmtSpan::ENTER)
54            .without_time()
55            .finish(),
56    )
57    .unwrap();
58    tracing_log::LogTracer::init().unwrap();
59}
60
61/***
62 * Pager Data Structures Initialization
63 */
64fn data_structure_init() -> PagerData {
65    let pager_data = PagerData::new();
66
67    return pager_data;
68}
69
70/***
71 * Queue Initializing
72 */
73fn attach_queue<T: std::marker::Copy, U: std::marker::Copy, Q>(
74    obj_id: ObjID,
75    queue_constructor: impl FnOnce(twizzler_queue::Queue<T, U>) -> Q,
76) -> Result<Q> {
77    tracing::debug!("Pager Attaching Queue: {}", obj_id);
78
79    let object = unsafe {
80        Object::<QueueBase<T, U>>::map_unchecked(obj_id, MapFlags::READ | MapFlags::WRITE).unwrap()
81    };
82
83    tracing::debug!("queue mapped; constructing...");
84    // Ensure the object is cast or transformed to match the expected `Queue` type
85    let queue: twizzler_queue::Queue<T, U> = twizzler_queue::Queue::from(object.into_handle());
86    Ok(queue_constructor(queue))
87}
88
89fn queue_init(
90    q1: ObjID,
91    q2: ObjID,
92) -> (
93    twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
94    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
95) {
96    let rq = attach_queue::<RequestFromKernel, CompletionToKernel, _>(q1, |q| q).unwrap();
97    let sq = attach_queue::<RequestFromPager, CompletionToPager, _>(
98        q2,
99        twizzler_queue::QueueSender::new,
100    )
101    .unwrap();
102
103    return (rq, sq);
104}
105
106/***
107 * Pager Initialization generic function which calls specific initialization functions
108 */
109fn pager_init(
110    q1: ObjID,
111    q2: ObjID,
112) -> (
113    &'static twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
114    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
115    PagerData,
116) {
117    tracing_init();
118    let data = data_structure_init();
119    let (rq, sq) = queue_init(q1, q2);
120
121    let rq = unsafe { Box::into_raw(Box::new(rq)).as_ref().unwrap() };
122    tracing::debug!("init complete");
123    return (rq, sq, data);
124}
125
126struct PagerContext {
127    data: PagerData,
128    kernel_notify: &'static twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
129    _pool: PagerThreadPool,
130
131    store: OnceLock<Ext4Store<Disk>>,
132}
133
134impl PagerContext {
135    pub fn paged_ostore(&self, _id: Option<ObjID>) -> Result<&Ext4Store<Disk>> {
136        Ok(self.store.wait())
137    }
138
139    pub async fn enumerate_external(&'static self, id: ObjID) -> Result<Vec<ExternalFile>> {
140        Ok(self
141            .paged_ostore(None)?
142            .enumerate_external(id.raw())
143            .await?
144            .iter()
145            .cloned()
146            .collect())
147    }
148
149    pub fn notify_kernel(&'static self, id: u32, comp: CompletionToKernel) {
150        self.kernel_notify
151            .complete(id, comp, SubmissionFlags::empty())
152            .unwrap();
153    }
154}
155
156static PAGER_CTX: OnceLock<PagerContext> = OnceLock::new();
157
158fn do_pager_start(q1: ObjID, q2: ObjID) -> ObjID {
159    let (rq, sq, data) = pager_init(q1, q2);
160    let sq = Arc::new(sq);
161    init_pr_mgr(sq);
162    #[allow(unused_variables)]
163    let disk = run_async(Disk::new()).unwrap();
164
165    let _ = PAGER_CTX.set(PagerContext {
166        data,
167        kernel_notify: rq,
168        store: OnceLock::new(),
169        _pool: PagerThreadPool::new(rq),
170    });
171    let ctx = PAGER_CTX.get().unwrap();
172
173    #[allow(unused_variables)]
174    let virtio_store = run_async(init_virtio()).unwrap();
175    let ext4_store = run_async(Ext4Store::new(disk.clone(), "/")).unwrap();
176
177    let _ = ctx.store.set(ext4_store);
178
179    run_async(async move {
180        let _ = report_ready().await.unwrap();
181    });
182
183    tracing::info!("pager ready");
184
185    //disk::benches::bench_disk(ctx);
186    if false {
187        spawn_async(async {
188            let pager = PAGER_CTX.get().unwrap();
189            loop {
190                pager.data.print_stats();
191                pager.data.reset_stats();
192                Timer::after(Duration::from_millis(1000)).await;
193            }
194        });
195    }
196
197    let bootstrap_id = ctx.paged_ostore(None).map_or(0u128, |po| {
198        if let Ok(id) = run_async(po.get_config_id()) {
199            id
200        } else {
201            tracing::info!("creating new naming object");
202            let vo =
203                VecObject::<u32, VecObjectAlloc>::new(ObjectBuilder::default().persist()).unwrap();
204            run_async(po.set_config_id(vo.object().id().raw())).unwrap();
205            vo.object().id().raw()
206        }
207    });
208    tracing::info!("found root namespace: {:x}", bootstrap_id);
209
210    return bootstrap_id.into();
211}
212
213#[secgate::secure_gate]
214pub fn pager_start(q1: ObjID, q2: ObjID) -> Result<ObjID> {
215    Ok(do_pager_start(q1, q2))
216}
217
218#[secgate::secure_gate]
219pub fn adv_lethe() -> Result<()> {
220    run_async(PAGER_CTX.get().unwrap().paged_ostore(None)?.flush()).unwrap();
221    Ok(())
222}
223
224#[secgate::secure_gate]
225pub fn disk_len(id: ObjID) -> Result<u64> {
226    run_async(PAGER_CTX.get().unwrap().paged_ostore(None)?.len(id.raw()))
227        // TODO: err
228        .map_err(|_| TwzError::NOT_SUPPORTED)
229}