trace/
tracer.rs

1use std::{
2    fmt::Debug,
3    sync::{
4        Condvar, Mutex,
5        atomic::{AtomicU64, Ordering},
6    },
7    thread::Builder,
8    time::Instant,
9    usize,
10};
11
12use miette::IntoDiagnostic;
13use monitor_api::{CompartmentFlags, CompartmentHandle};
14use twizzler::{
15    BaseType, Invariant,
16    object::{MapFlags, ObjID, Object, ObjectBuilder, RawObject, TypedObject},
17};
18use twizzler_abi::{
19    syscall::{
20        ObjectCreate, PERTHREAD_TRACE_GEN_SAMPLE, ThreadSync, ThreadSyncFlags, ThreadSyncOp,
21        ThreadSyncReference, ThreadSyncSleep, ThreadSyncWake, TraceSpec, sys_ktrace,
22        sys_thread_change_state, sys_thread_self_id, sys_thread_set_trace_events, sys_thread_sync,
23    },
24    thread::ExecutionState,
25    trace::{TraceBase, TraceData, TraceEntryFlags, TraceEntryHead},
26};
27
28use crate::Cli;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31enum State {
32    Setup,
33    Ready,
34    Running,
35    Done,
36}
37
38pub struct TraceSource {
39    objects: Vec<Object<BaseWrap>>,
40    end_point: u64,
41    pub total: u64,
42}
43
44pub struct TracingState {
45    pub kernel_source: TraceSource,
46    pub user_source: Option<TraceSource>,
47    state: State,
48    pub start_time: Instant,
49    pub end_time: Instant,
50    pub name: String,
51    pub nr_wakes: usize,
52    pub collector_id: ObjID,
53}
54
55impl Debug for TraceSource {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(
58            f,
59            "TraceSource {{ {} objects, end_point: {}, total: {} }}",
60            self.objects.len(),
61            self.end_point,
62            self.total,
63        )
64    }
65}
66
67impl Debug for TracingState {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(
70            f,
71            "TracingState {{ kernel_source: {:?}, user_source: {:?}, state: {:?}, start_time: {:?}, end_time: {:?}, name: {:?} }}",
72            self.kernel_source,
73            self.user_source,
74            self.state,
75            self.start_time,
76            self.end_time,
77            self.name,
78        )
79    }
80}
81
82#[derive(BaseType, Invariant)]
83#[repr(transparent)]
84pub struct BaseWrap(pub TraceBase);
85
86impl TracingState {
87    fn new(
88        name: String,
89        specs: &[TraceSpec],
90        user_prime: Option<Object<BaseWrap>>,
91    ) -> miette::Result<Self> {
92        let prime = ObjectBuilder::new(ObjectCreate::default())
93            .build(BaseWrap(TraceBase {
94                start: 0,
95                end: AtomicU64::new(0),
96            }))
97            .into_diagnostic()?;
98
99        for spec in specs {
100            sys_ktrace(prime.id(), Some(spec)).into_diagnostic()?;
101        }
102
103        let user_source = user_prime.map(|up| TraceSource {
104            objects: vec![up],
105            end_point: 0,
106            total: 0,
107        });
108
109        let kernel_source = TraceSource {
110            objects: vec![prime],
111            end_point: 0,
112            total: 0,
113        };
114
115        Ok(Self {
116            kernel_source,
117            user_source,
118            state: State::Setup,
119            start_time: Instant::now(),
120            end_time: Instant::now(),
121            name,
122            nr_wakes: 0,
123            collector_id: 0.into(),
124        })
125    }
126
127    fn collect(&mut self) -> miette::Result<[Option<ThreadSyncSleep>; 2]> {
128        let s1 = self.kernel_source.collect()?;
129        let s2 = self.user_source.as_mut().and_then(|us| us.collect().ok());
130        Ok([Some(s1), s2])
131    }
132
133    pub fn data(&self) -> impl Iterator<Item = (&'_ TraceEntryHead, Option<&'_ TraceData<()>>)> {
134        self.kernel_source.data().chain(
135            self.user_source
136                .as_ref()
137                .map(|us| us.data())
138                .unwrap_or(TraceDataIter::empty()),
139        )
140    }
141}
142
143impl TraceSource {
144    fn collect(&mut self) -> miette::Result<ThreadSyncSleep> {
145        let mut current = self.objects.last().unwrap();
146        let posted_end = current.base().0.end.load(Ordering::SeqCst);
147        let start_point = self.end_point.max(current.base().0.start);
148        tracing::trace!(
149            "collect {:x}: {:x} {:x}: {}",
150            self.end_point,
151            posted_end,
152            start_point,
153            self.objects.len()
154        );
155        if self.end_point != posted_end {
156            let amount = posted_end.saturating_sub(start_point);
157
158            if amount > 0 {
159                self.total += amount;
160
161                // scan for next object directives
162                let mut offset = 0usize;
163                while offset < amount as usize {
164                    let header = current
165                        .lea(start_point as usize + offset, size_of::<TraceEntryHead>())
166                        .unwrap()
167                        .cast::<TraceEntryHead>();
168                    let header = unsafe { &*header };
169                    if header.flags.contains(TraceEntryFlags::NEXT_OBJECT) {
170                        tracing::debug!("got next tracing object: {}", header.extra_or_next);
171                        let next = unsafe {
172                            Object::<BaseWrap>::map_unchecked(header.extra_or_next, MapFlags::READ)
173                                .into_diagnostic()
174                        }?;
175                        self.objects.push(next);
176                        current = self.objects.last().unwrap();
177                        self.end_point = current.base().0.start;
178                        return self.collect();
179                    } else {
180                        offset += size_of::<TraceEntryHead>();
181                        if header.flags.contains(TraceEntryFlags::HAS_DATA) {
182                            let data_header = current
183                                .lea(start_point as usize + offset, size_of::<TraceData<()>>())
184                                .unwrap()
185                                .cast::<TraceData<()>>();
186                            offset += (unsafe { *data_header }).len as usize;
187                        }
188                    }
189                }
190                if offset == amount as usize {
191                    self.end_point += amount;
192                }
193            }
194        }
195
196        Ok(ThreadSyncSleep::new(
197            ThreadSyncReference::Virtual(&current.base().0.end),
198            start_point,
199            ThreadSyncOp::Equal,
200            ThreadSyncFlags::empty(),
201        ))
202    }
203
204    pub fn data(&self) -> TraceDataIter<'_> {
205        TraceDataIter {
206            state: Some(self),
207            pos: 0,
208            inner_pos: 0,
209        }
210    }
211}
212
213pub struct TraceDataIter<'a> {
214    state: Option<&'a TraceSource>,
215    pos: usize,
216    inner_pos: u64,
217}
218
219impl TraceDataIter<'_> {
220    pub fn empty() -> Self {
221        Self {
222            state: None,
223            pos: 0,
224            inner_pos: 0,
225        }
226    }
227}
228
229#[allow(dead_code)]
230struct Tracer {
231    state: Mutex<TracingState>,
232    specs: Vec<TraceSpec>,
233    state_cv: Condvar,
234    notifier: AtomicU64,
235}
236
237impl<'a> Iterator for TraceDataIter<'a> {
238    type Item = (&'a TraceEntryHead, Option<&'a TraceData<()>>);
239
240    fn next(&mut self) -> Option<Self::Item> {
241        if self.state.is_none() {
242            return None;
243        }
244        let obj = self.state.as_ref().unwrap().objects.get(self.pos)?;
245        let start_pos = self.inner_pos.max(obj.base().0.start);
246        self.inner_pos = start_pos;
247        let end = obj.base().0.end.load(Ordering::SeqCst);
248        if start_pos + size_of::<TraceEntryHead>() as u64 > end {
249            self.pos += 1;
250            self.inner_pos = 0;
251            return self.next();
252        }
253        let mut len = size_of::<TraceEntryHead>();
254        let header = obj
255            .lea(start_pos as usize, len)
256            .unwrap()
257            .cast::<TraceEntryHead>();
258        let header = unsafe { header.as_ref().unwrap() };
259        let data = if header.flags.contains(TraceEntryFlags::HAS_DATA) {
260            let data_header = obj
261                .lea(
262                    start_pos as usize + size_of::<TraceEntryHead>(),
263                    size_of::<TraceData<()>>(),
264                )
265                .unwrap()
266                .cast::<TraceData<()>>();
267            let data_header = unsafe { data_header.as_ref().unwrap() };
268            let data = obj
269                .lea(
270                    start_pos as usize + size_of::<TraceEntryHead>(),
271                    data_header.len as usize,
272                )
273                .unwrap()
274                .cast::<TraceData<()>>();
275            let data = unsafe { data.as_ref().unwrap() };
276            len += data.len as usize;
277            Some(data)
278        } else {
279            None
280        };
281
282        self.inner_pos += len as u64;
283
284        Some((header, data))
285    }
286}
287
288impl Tracer {
289    fn set_state(&self, new_state: State) {
290        tracing::trace!("setting tracing state: {:?}", new_state);
291        let mut guard = self.state.lock().unwrap();
292        guard.state = new_state;
293        self.state_cv.notify_all();
294    }
295
296    fn wait_for(&self, target_state: State) {
297        tracing::trace!("wait for tracing state: {:?}", target_state);
298        let mut guard = self.state.lock().unwrap();
299        while guard.state != target_state {
300            guard = self.state_cv.wait(guard).unwrap();
301        }
302    }
303
304    fn notify_exit(&self) {
305        let wake = ThreadSyncWake::new(ThreadSyncReference::Virtual(&self.notifier), usize::MAX);
306        self.notifier.store(1, Ordering::SeqCst);
307        let _ = sys_thread_sync(&mut [ThreadSync::new_wake(wake)], None).inspect_err(|e| {
308            tracing::warn!("failed to notify exit: {}", e);
309        });
310    }
311}
312
313fn collector(tracer: &Tracer) {
314    tracer.state.lock().unwrap().collector_id = sys_thread_self_id();
315    tracer.set_state(State::Ready);
316    let mut nr_wakes = 0;
317    loop {
318        let mut guard = tracer.state.lock().unwrap();
319        let Ok(waiter) = guard.collect().inspect_err(|e| {
320            tracing::error!("failed to collect trace data: {}", e);
321        }) else {
322            continue;
323        };
324
325        if tracer.notifier.load(Ordering::SeqCst) == 0 {
326            drop(guard);
327            let mut waiters = [
328                ThreadSync::new_sleep(waiter[0].unwrap()),
329                ThreadSync::new_sleep(ThreadSyncSleep::new(
330                    ThreadSyncReference::Virtual(&tracer.notifier),
331                    0,
332                    ThreadSyncOp::Equal,
333                    ThreadSyncFlags::empty(),
334                )),
335                ThreadSync::new_sleep(waiter[1].unwrap_or(ThreadSyncSleep::new(
336                    ThreadSyncReference::Virtual(core::ptr::null()),
337                    0,
338                    ThreadSyncOp::Equal,
339                    ThreadSyncFlags::empty(),
340                ))),
341            ];
342            let mut waiters = waiters.as_mut_slice();
343            tracing::trace!(
344                "collector is waiting for data: {} {} {}",
345                waiters[0].ready(),
346                waiters[1].ready(),
347                if waiter[1].is_some() {
348                    if waiters[2].ready() { "true" } else { "false" }
349                } else {
350                    "-"
351                }
352            );
353            if waiter[1].is_none() {
354                waiters = &mut waiters[0..2];
355            }
356            if waiters.iter().all(|w| !w.ready()) {
357                let _ = sys_thread_sync(waiters, None).inspect_err(|e| {
358                    tracing::warn!("failed to thread sync: {}", e);
359                });
360                nr_wakes += 1;
361            }
362        } else {
363            tracing::trace!("collector was notified of exit");
364
365            let _ = sys_ktrace(guard.kernel_source.objects.first().unwrap().id(), None)
366                .inspect_err(|e| {
367                    tracing::error!("failed to disable tracing: {}", e);
368                });
369            let _ = guard.collect().inspect_err(|e| {
370                tracing::error!("failed to collect trace data: {}", e);
371            });
372            if guard.state == State::Done {
373                break;
374            }
375            drop(tracer.state_cv.wait(guard).unwrap());
376        }
377    }
378    tracer.state.lock().unwrap().nr_wakes = nr_wakes;
379}
380
381pub fn start(
382    cli: &Cli,
383    comp: CompartmentHandle,
384    specs: Vec<TraceSpec>,
385    rt_trace: Option<Object<BaseWrap>>,
386) -> miette::Result<TracingState> {
387    let state = TracingState::new(comp.info().name, specs.as_slice(), rt_trace)?;
388
389    let tracer = Tracer {
390        state: Mutex::new(state),
391        specs,
392        state_cv: Condvar::new(),
393        notifier: AtomicU64::new(0),
394    };
395    std::thread::scope(|scope| {
396        let th_collector = Builder::new()
397            .name("trace-collector".to_owned())
398            .spawn_scoped(scope, || collector(&tracer))
399            .into_diagnostic()?;
400
401        tracer.wait_for(State::Ready);
402
403        let start = Instant::now();
404        for thread in comp.threads() {
405            let id: ObjID = thread.repr_id;
406            tracing::debug!("resuming compartment thread {}", id);
407            sys_thread_change_state(id, ExecutionState::Running).into_diagnostic()?;
408            if cli.prog.sample {
409                tracing::debug!("setting per-thread sampling for {}", id);
410                sys_thread_set_trace_events(id, PERTHREAD_TRACE_GEN_SAMPLE).into_diagnostic()?;
411            }
412        }
413        tracer.set_state(State::Running);
414
415        let mut flags = comp.info().flags;
416        while !flags.contains(CompartmentFlags::EXITED) {
417            flags = comp.wait(flags);
418        }
419        let end = Instant::now();
420        tracing::debug!(
421            "compartment exited after {:2.2}s",
422            (end - start).as_secs_f32()
423        );
424        tracer.state.lock().unwrap().start_time = start;
425        tracer.state.lock().unwrap().end_time = end;
426
427        tracer.set_state(State::Done);
428        tracer.notify_exit();
429
430        th_collector.join().unwrap();
431
432        std::io::Result::Ok(()).into_diagnostic()
433    })?;
434    tracer.state.into_inner().into_diagnostic()
435}