pager_srv/
lib.rs

1#![feature(io_error_more)]
2#![feature(test)]
3#![feature(thread_local)]
4
5use std::{
6    sync::{Arc, OnceLock},
7    time::Duration,
8};
9
10use async_io::Timer;
11use disk::Disk;
12use memstore::virtio::init_virtio;
13use object_store::{Ext4Store, PagedObjectStore};
14use physrw::{init_pr_mgr, report_ready};
15use threads::{run_async, spawn_async, PagerThreadPool};
16use tracing_subscriber::fmt::format::FmtSpan;
17use twizzler::{
18    collections::vec::{VecObject, VecObjectAlloc},
19    object::{ObjID, Object, ObjectBuilder},
20    Result,
21};
22use twizzler_abi::pager::{
23    CompletionToKernel, CompletionToPager, RequestFromKernel, RequestFromPager,
24};
25use twizzler_queue::{QueueBase, SubmissionFlags};
26use twizzler_rt_abi::{error::TwzError, object::MapFlags};
27
28use crate::data::PagerData;
29
30mod data;
31mod disk;
32mod handle;
33mod helpers;
34// in-progress
35#[allow(unused)]
36mod memstore;
37mod nvme;
38mod physrw;
39mod request_handle;
40mod stats;
41mod threads;
42
43/***
44 * Tracing Init
45 */
46fn tracing_init() {
47    tracing::subscriber::set_global_default(
48        tracing_subscriber::fmt()
49            .with_max_level(tracing::Level::INFO)
50            .with_span_events(FmtSpan::ENTER)
51            .without_time()
52            .finish(),
53    )
54    .unwrap();
55    tracing_log::LogTracer::init().unwrap();
56}
57
58/***
59 * Pager Data Structures Initialization
60 */
61fn data_structure_init() -> PagerData {
62    let pager_data = PagerData::new();
63
64    return pager_data;
65}
66
67/***
68 * Queue Initializing
69 */
70fn attach_queue<T: std::marker::Copy, U: std::marker::Copy, Q>(
71    obj_id: ObjID,
72    queue_constructor: impl FnOnce(twizzler_queue::Queue<T, U>) -> Q,
73) -> Result<Q> {
74    tracing::debug!("Pager Attaching Queue: {}", obj_id);
75
76    let object = unsafe {
77        Object::<QueueBase<T, U>>::map_unchecked(obj_id, MapFlags::READ | MapFlags::WRITE).unwrap()
78    };
79
80    tracing::debug!("queue mapped; constructing...");
81    // Ensure the object is cast or transformed to match the expected `Queue` type
82    let queue: twizzler_queue::Queue<T, U> = twizzler_queue::Queue::from(object.into_handle());
83    Ok(queue_constructor(queue))
84}
85
86fn queue_init(
87    q1: ObjID,
88    q2: ObjID,
89) -> (
90    twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
91    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
92) {
93    let rq = attach_queue::<RequestFromKernel, CompletionToKernel, _>(q1, |q| q).unwrap();
94    let sq = attach_queue::<RequestFromPager, CompletionToPager, _>(
95        q2,
96        twizzler_queue::QueueSender::new,
97    )
98    .unwrap();
99
100    return (rq, sq);
101}
102
103/***
104 * Pager Initialization generic function which calls specific initialization functions
105 */
106fn pager_init(
107    q1: ObjID,
108    q2: ObjID,
109) -> (
110    &'static twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
111    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
112    PagerData,
113) {
114    tracing_init();
115    let data = data_structure_init();
116    let (rq, sq) = queue_init(q1, q2);
117
118    let rq = unsafe { Box::into_raw(Box::new(rq)).as_ref().unwrap() };
119    tracing::debug!("init complete");
120    return (rq, sq, data);
121}
122
123struct PagerContext {
124    data: PagerData,
125    kernel_notify: &'static twizzler_queue::Queue<RequestFromKernel, CompletionToKernel>,
126    _pool: PagerThreadPool,
127
128    store: OnceLock<Ext4Store<Disk>>,
129}
130
131impl PagerContext {
132    pub fn paged_ostore(&self, _id: Option<ObjID>) -> Result<&Ext4Store<Disk>> {
133        Ok(self.store.wait())
134    }
135
136    pub fn notify_kernel(&'static self, id: u32, comp: CompletionToKernel) {
137        self.kernel_notify
138            .complete(id, comp, SubmissionFlags::empty())
139            .unwrap();
140    }
141}
142
143static PAGER_CTX: OnceLock<PagerContext> = OnceLock::new();
144
145fn do_pager_start(q1: ObjID, q2: ObjID) -> ObjID {
146    let (rq, sq, data) = pager_init(q1, q2);
147    let sq = Arc::new(sq);
148    init_pr_mgr(sq);
149    #[allow(unused_variables)]
150    let disk = run_async(Disk::new()).unwrap();
151
152    let _ = PAGER_CTX.set(PagerContext {
153        data,
154        kernel_notify: rq,
155        store: OnceLock::new(),
156        _pool: PagerThreadPool::new(rq),
157    });
158    let ctx = PAGER_CTX.get().unwrap();
159
160    #[allow(unused_variables)]
161    let virtio_store = run_async(init_virtio()).unwrap();
162    let ext4_store = run_async(Ext4Store::new(disk.clone(), "/")).unwrap();
163
164    let _ = ctx.store.set(ext4_store);
165
166    run_async(async move {
167        let _ = report_ready().await.unwrap();
168    });
169
170    tracing::info!("pager ready");
171
172    //disk::benches::bench_disk(ctx);
173    if false {
174        spawn_async(async {
175            let pager = PAGER_CTX.get().unwrap();
176            loop {
177                pager.data.print_stats();
178                pager.data.reset_stats();
179                Timer::after(Duration::from_millis(1000)).await;
180            }
181        });
182    }
183
184    let bootstrap_id = ctx.paged_ostore(None).map_or(0u128, |po| {
185        if let Ok(id) = run_async(po.get_config_id()) {
186            id
187        } else {
188            tracing::info!("creating new naming object");
189            let vo = VecObject::<u32, VecObjectAlloc>::new(ObjectBuilder::default().persist(true))
190                .unwrap();
191            run_async(po.set_config_id(vo.object().id().raw())).unwrap();
192            vo.object().id().raw()
193        }
194    });
195    tracing::info!("found root namespace: {:x}", bootstrap_id);
196
197    return bootstrap_id.into();
198}
199
200#[secgate::entry(lib = "pager")]
201pub fn pager_start(q1: ObjID, q2: ObjID) -> Result<ObjID> {
202    Ok(do_pager_start(q1, q2))
203}
204
205#[secgate::entry(lib = "pager")]
206pub fn adv_lethe() -> Result<()> {
207    run_async(PAGER_CTX.get().unwrap().paged_ostore(None)?.flush()).unwrap();
208    Ok(())
209}
210
211#[secgate::entry(lib = "pager")]
212pub fn disk_len(id: ObjID) -> Result<u64> {
213    run_async(PAGER_CTX.get().unwrap().paged_ostore(None)?.len(id.raw()))
214        // TODO: err
215        .map_err(|_| TwzError::NOT_SUPPORTED)
216}