pager_srv/
lib.rs

1#![feature(naked_functions)]
2#![feature(io_error_more)]
3#![feature(test)]
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, Mutex, OnceLock},
8    time::Duration,
9};
10
11use async_executor::Executor;
12use async_io::{block_on, Timer};
13use disk::Disk;
14use memstore::virtio::init_virtio;
15use object_store::{Ext4Store, ExternalFile, PagedDevice, PagedObjectStore};
16use tracing_subscriber::fmt::format::FmtSpan;
17use twizzler::{
18    collections::vec::{VecObject, VecObjectAlloc},
19    object::{ObjID, Object, ObjectBuilder},
20    Result,
21};
22use twizzler_abi::pager::{
23    CompletionToKernel, CompletionToPager, PagerCompletionData, RequestFromKernel, RequestFromPager,
24};
25use twizzler_queue::{QueueBase, QueueSender};
26use twizzler_rt_abi::{error::TwzError, object::MapFlags};
27
28use crate::{data::PagerData, request_handle::handle_kernel_request};
29
30mod data;
31mod disk;
32mod handle;
33mod helpers;
34// in-progress
35#[allow(unused)]
36mod memstore;
37mod nvme;
38mod physrw;
39mod request_handle;
40mod stats;
41
42pub use handle::{pager_close_handle, pager_open_handle};
43
44pub static EXECUTOR: OnceLock<Executor> = OnceLock::new();
45
46/***
47 * Tracing Init
48 */
49fn tracing_init() {
50    tracing::subscriber::set_global_default(
51        tracing_subscriber::fmt()
52            .with_max_level(tracing::Level::INFO)
53            .with_span_events(FmtSpan::ENTER)
54            .without_time()
55            .finish(),
56    )
57    .unwrap();
58    tracing_log::LogTracer::init().unwrap();
59}
60
61/***
62 * Pager Data Structures Initialization
63 */
64fn data_structure_init() -> PagerData {
65    let pager_data = PagerData::new();
66
67    return pager_data;
68}
69
70/***
71 * Queue Initializing
72 */
73fn attach_queue<T: std::marker::Copy, U: std::marker::Copy, Q>(
74    obj_id: ObjID,
75    queue_constructor: impl FnOnce(twizzler_queue::Queue<T, U>) -> Q,
76) -> Result<Q> {
77    tracing::debug!("Pager Attaching Queue: {}", obj_id);
78
79    let object = unsafe {
80        Object::<QueueBase<T, U>>::map_unchecked(obj_id, MapFlags::READ | MapFlags::WRITE).unwrap()
81    };
82
83    tracing::debug!("queue mapped; constructing...");
84    // Ensure the object is cast or transformed to match the expected `Queue` type
85    let queue: twizzler_queue::Queue<T, U> = twizzler_queue::Queue::from(object.into_handle());
86    Ok(queue_constructor(queue))
87}
88
89fn queue_init(
90    q1: ObjID,
91    q2: ObjID,
92) -> (
93    twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>,
94    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
95) {
96    let rq = attach_queue::<RequestFromKernel, CompletionToKernel, _>(
97        q1,
98        twizzler_queue::CallbackQueueReceiver::new,
99    )
100    .unwrap();
101    let sq = attach_queue::<RequestFromPager, CompletionToPager, _>(
102        q2,
103        twizzler_queue::QueueSender::new,
104    )
105    .unwrap();
106
107    return (rq, sq);
108}
109
110/***
111 * Async Runtime Initialization
112 * Creating n threads
113 */
114fn async_runtime_init(n: i32) -> &'static Executor<'static> {
115    let ex = EXECUTOR.get_or_init(|| Executor::new());
116
117    for _ in 0..n {
118        std::thread::spawn(|| block_on(ex.run(std::future::pending::<()>())));
119    }
120
121    return ex;
122}
123
124/***
125 * Pager Initialization generic function which calls specific initialization functions
126 */
127fn pager_init(
128    q1: ObjID,
129    q2: ObjID,
130) -> (
131    twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>,
132    twizzler_queue::QueueSender<RequestFromPager, CompletionToPager>,
133    PagerData,
134    &'static Executor<'static>,
135) {
136    tracing_init();
137    let data = data_structure_init();
138    let ex = async_runtime_init(4);
139    let (rq, sq) = queue_init(q1, q2);
140
141    tracing::debug!("init complete");
142    return (rq, sq, data, ex);
143}
144
145fn spawn_queues(
146    ctx: &'static PagerContext,
147    kernel_rq: Arc<twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>>,
148    ex: &'static Executor<'static>,
149) {
150    tracing::debug!("spawning queues...");
151    ex.spawn(listen_queue(kernel_rq, ctx, handle_kernel_request, ex))
152        .detach();
153}
154
155async fn listen_queue<R, C, F, I>(
156    kernel_rq: Arc<twizzler_queue::CallbackQueueReceiver<R, C>>,
157    ctx: &'static PagerContext,
158    handler: impl Fn(&'static PagerContext, u32, R) -> F + Copy + Send + Sync + 'static,
159    _ex: &'static Executor<'static>,
160) where
161    F: std::future::Future<Output = I> + Send + 'static,
162    R: std::fmt::Debug + Copy + Send + Sync + 'static,
163    C: std::fmt::Debug + Copy + Send + Sync + 'static,
164    I: IntoIterator<Item = C> + Send + Sync + 'static,
165{
166    loop {
167        tracing::trace!("queue receiving...");
168        let (id, request) = kernel_rq.receive().await.unwrap();
169        tracing::trace!("got request: ({},{:?})", id, request);
170
171        let comp = handler(ctx, id, request).await;
172        for comp in comp {
173            notify(&kernel_rq, id, comp).await;
174        }
175    }
176}
177
178async fn notify<R, C>(q: &Arc<twizzler_queue::CallbackQueueReceiver<R, C>>, id: u32, res: C)
179where
180    R: std::fmt::Debug + Copy + Send + Sync,
181    C: std::fmt::Debug + Copy + Send + Sync + 'static,
182{
183    q.complete(id, res).await.unwrap();
184    //tracing::trace!("request {} complete", id);
185}
186
187async fn report_ready(
188    ctx: &PagerContext,
189    _ex: &'static Executor<'static>,
190) -> Option<PagerCompletionData> {
191    tracing::debug!("sending ready signal to kernel");
192    let request = RequestFromPager::new(twizzler_abi::pager::PagerRequest::Ready);
193
194    match ctx.sender.submit_and_wait(request).await {
195        Ok(completion) => {
196            tracing::debug!("received completion for ready signal: {:?}", completion);
197            return Some(completion.data());
198        }
199        Err(e) => {
200            tracing::warn!("error from ready signal {:?}", e);
201            return None;
202        }
203    }
204}
205
206struct PagerContext {
207    data: PagerData,
208    sender: Arc<QueueSender<RequestFromPager, CompletionToPager>>,
209    kernel_notify:
210        Arc<twizzler_queue::CallbackQueueReceiver<RequestFromKernel, CompletionToKernel>>,
211
212    //paged_ostore: Box<dyn PagedObjectStore<DiskPageRequest> + 'static + Sync + Send>,
213    //disk: Disk,
214    stores: Mutex<Stores>,
215}
216
217struct Stores {
218    map: HashMap<ObjID, Arc<Store>>,
219    default: ObjID,
220}
221
222impl Stores {
223    pub fn paged_ostore(&self, id: Option<ObjID>) -> Result<Arc<Store>> {
224        match id {
225            Some(id) => Ok(self.map.get(&id).ok_or(TwzError::INVALID_ARGUMENT)?.clone()),
226            None => Ok(self
227                .map
228                .get(&self.default)
229                .ok_or(TwzError::INVALID_ARGUMENT)?
230                .clone()),
231        }
232    }
233
234    pub fn insert_device(
235        &mut self,
236        store: Arc<dyn PagedObjectStore + Send + Sync + 'static>,
237        dev: Arc<dyn PagedDevice + Send + Sync + 'static>,
238    ) {
239        self.map
240            .insert(ObjID::new(0), Arc::new(Store { inner: store, dev }));
241    }
242}
243
244#[allow(dead_code)]
245struct Store {
246    inner: Arc<dyn PagedObjectStore + Send + Sync + 'static>,
247    dev: Arc<dyn PagedDevice + Send + Sync + 'static>,
248}
249
250impl PagedObjectStore for Store {
251    fn create_object(&self, id: object_store::ObjID) -> Result<()> {
252        self.inner.create_object(id)
253    }
254
255    fn delete_object(&self, id: object_store::ObjID) -> Result<()> {
256        self.inner.delete_object(id)
257    }
258
259    fn len(&self, id: object_store::ObjID) -> Result<u64> {
260        self.inner.len(id)
261    }
262
263    fn read_object(&self, id: object_store::ObjID, offset: u64, buf: &mut [u8]) -> Result<usize> {
264        self.inner.read_object(id, offset, buf)
265    }
266
267    fn write_object(&self, id: object_store::ObjID, offset: u64, buf: &[u8]) -> Result<()> {
268        self.inner.write_object(id, offset, buf)
269    }
270
271    fn get_config_id(&self) -> Result<object_store::ObjID> {
272        self.inner.get_config_id()
273    }
274
275    fn set_config_id(&self, id: object_store::ObjID) -> Result<()> {
276        self.inner.set_config_id(id)
277    }
278
279    fn flush(&self) -> Result<()> {
280        self.inner.flush()
281    }
282
283    fn page_in_object<'a>(
284        &self,
285        id: object_store::ObjID,
286        reqs: &'a mut [object_store::PageRequest],
287    ) -> Result<usize> {
288        self.inner.page_in_object(id, reqs)
289    }
290
291    fn page_out_object<'a>(
292        &self,
293        id: object_store::ObjID,
294        reqs: &'a mut [object_store::PageRequest],
295    ) -> Result<usize> {
296        self.inner.page_out_object(id, reqs)
297    }
298
299    fn enumerate_external(&self, _id: object_store::ObjID) -> Result<Vec<ExternalFile>> {
300        self.inner.enumerate_external(_id)
301    }
302
303    fn find_external(&self, _id: object_store::ObjID) -> Result<usize> {
304        self.inner.find_external(_id)
305    }
306}
307
308impl PagerContext {
309    pub fn paged_ostore(&self, id: Option<ObjID>) -> Result<Arc<Store>> {
310        self.stores.lock().unwrap().paged_ostore(id)
311    }
312
313    pub async fn enumerate_external(&'static self, id: ObjID) -> Result<Vec<ExternalFile>> {
314        blocking::unblock(move || {
315            Ok(self
316                .paged_ostore(None)?
317                .enumerate_external(id.raw())?
318                .iter()
319                .cloned()
320                .collect())
321        })
322        .await
323    }
324
325    pub async fn notify_kernel(&'static self, id: u32, comp: CompletionToKernel) {
326        notify(&self.kernel_notify, id, comp).await;
327    }
328}
329
330static PAGER_CTX: OnceLock<PagerContext> = OnceLock::new();
331
332fn do_pager_start(q1: ObjID, q2: ObjID) -> ObjID {
333    let (rq, sq, data, ex) = pager_init(q1, q2);
334    #[allow(unused_variables)]
335    let disk = block_on(ex.run(Disk::new(ex))).unwrap();
336
337    let sq = Arc::new(sq);
338    let rq = Arc::new(rq);
339    let _ = PAGER_CTX.set(PagerContext {
340        data,
341        sender: sq,
342        kernel_notify: rq.clone(),
343        stores: Mutex::new(Stores {
344            map: HashMap::new(),
345            default: ObjID::new(0),
346        }),
347    });
348    let ctx = PAGER_CTX.get().unwrap();
349
350    #[allow(unused_variables)]
351    let virtio_store = block_on(ex.run(async move { init_virtio().await })).unwrap();
352    let ext4_store = Ext4Store::new(disk.clone(), "/").unwrap();
353
354    ctx.stores
355        .lock()
356        .unwrap()
357        .insert_device(Arc::new(ext4_store), Arc::new(disk));
358
359    spawn_queues(ctx, rq, ex);
360
361    block_on(ex.run(async move {
362        let _ = report_ready(&ctx, ex).await.unwrap();
363    }));
364
365    tracing::info!("pager ready");
366
367    //disk::benches::bench_disk(ctx);
368    if false {
369        let _ = ex
370            .spawn(async {
371                let pager = PAGER_CTX.get().unwrap();
372                loop {
373                    pager.data.print_stats();
374                    pager.data.reset_stats();
375                    Timer::after(Duration::from_millis(1000)).await;
376                }
377            })
378            .detach();
379    }
380
381    let bootstrap_id = ctx.paged_ostore(None).map_or(0u128, |po| {
382        po.get_config_id().unwrap_or_else(|_| {
383            tracing::info!("creating new naming object");
384            let vo =
385                VecObject::<u32, VecObjectAlloc>::new(ObjectBuilder::default().persist()).unwrap();
386            po.set_config_id(vo.object().id().raw()).unwrap();
387            vo.object().id().raw()
388        })
389    });
390    tracing::info!("found root namespace: {:x}", bootstrap_id);
391
392    return bootstrap_id.into();
393}
394
395#[secgate::secure_gate]
396pub fn pager_start(q1: ObjID, q2: ObjID) -> Result<ObjID> {
397    Ok(do_pager_start(q1, q2))
398}
399
400#[secgate::secure_gate]
401pub fn adv_lethe() -> Result<()> {
402    PAGER_CTX
403        .get()
404        .unwrap()
405        .paged_ostore(None)?
406        .flush()
407        .unwrap();
408    Ok(())
409}
410
411#[secgate::secure_gate]
412pub fn disk_len(id: ObjID) -> Result<u64> {
413    PAGER_CTX
414        .get()
415        .unwrap()
416        .paged_ostore(None)?
417        .len(id.raw())
418        // TODO: err
419        .map_err(|_| TwzError::NOT_SUPPORTED)
420}