1use std::{
2 fmt::Debug,
3 sync::{
4 Condvar, Mutex,
5 atomic::{AtomicU64, Ordering},
6 },
7 thread::Builder,
8 time::Instant,
9 usize,
10};
11
12use miette::IntoDiagnostic;
13use monitor_api::{CompartmentFlags, CompartmentHandle};
14use twizzler::{
15 BaseType, Invariant,
16 object::{MapFlags, ObjID, Object, ObjectBuilder, RawObject, TypedObject},
17};
18use twizzler_abi::{
19 syscall::{
20 ObjectCreate, PERTHREAD_TRACE_GEN_SAMPLE, ThreadSync, ThreadSyncFlags, ThreadSyncOp,
21 ThreadSyncReference, ThreadSyncSleep, ThreadSyncWake, TraceSpec, sys_ktrace,
22 sys_thread_change_state, sys_thread_self_id, sys_thread_set_trace_events, sys_thread_sync,
23 },
24 thread::ExecutionState,
25 trace::{TraceBase, TraceData, TraceEntryFlags, TraceEntryHead},
26};
27
28use crate::Cli;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31enum State {
32 Setup,
33 Ready,
34 Running,
35 Done,
36}
37
38pub struct TraceSource {
39 objects: Vec<Object<BaseWrap>>,
40 end_point: u64,
41 pub total: u64,
42}
43
44pub struct TracingState {
45 pub kernel_source: TraceSource,
46 pub user_source: Option<TraceSource>,
47 state: State,
48 pub start_time: Instant,
49 pub end_time: Instant,
50 pub name: String,
51 pub nr_wakes: usize,
52 pub collector_id: ObjID,
53}
54
55impl Debug for TraceSource {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(
58 f,
59 "TraceSource {{ {} objects, end_point: {}, total: {} }}",
60 self.objects.len(),
61 self.end_point,
62 self.total,
63 )
64 }
65}
66
67impl Debug for TracingState {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(
70 f,
71 "TracingState {{ kernel_source: {:?}, user_source: {:?}, state: {:?}, start_time: {:?}, end_time: {:?}, name: {:?} }}",
72 self.kernel_source,
73 self.user_source,
74 self.state,
75 self.start_time,
76 self.end_time,
77 self.name,
78 )
79 }
80}
81
82#[derive(BaseType, Invariant)]
83#[repr(transparent)]
84pub struct BaseWrap(pub TraceBase);
85
86impl TracingState {
87 fn new(
88 name: String,
89 specs: &[TraceSpec],
90 user_prime: Option<Object<BaseWrap>>,
91 ) -> miette::Result<Self> {
92 let prime = ObjectBuilder::new(ObjectCreate::default())
93 .build(BaseWrap(TraceBase {
94 start: 0,
95 end: AtomicU64::new(0),
96 }))
97 .into_diagnostic()?;
98
99 for spec in specs {
100 sys_ktrace(prime.id(), Some(spec)).into_diagnostic()?;
101 }
102
103 let user_source = user_prime.map(|up| TraceSource {
104 objects: vec![up],
105 end_point: 0,
106 total: 0,
107 });
108
109 let kernel_source = TraceSource {
110 objects: vec![prime],
111 end_point: 0,
112 total: 0,
113 };
114
115 Ok(Self {
116 kernel_source,
117 user_source,
118 state: State::Setup,
119 start_time: Instant::now(),
120 end_time: Instant::now(),
121 name,
122 nr_wakes: 0,
123 collector_id: 0.into(),
124 })
125 }
126
127 fn collect(&mut self) -> miette::Result<[Option<ThreadSyncSleep>; 2]> {
128 let s1 = self.kernel_source.collect()?;
129 let s2 = self.user_source.as_mut().and_then(|us| us.collect().ok());
130 Ok([Some(s1), s2])
131 }
132
133 pub fn data(&self) -> impl Iterator<Item = (&'_ TraceEntryHead, Option<&'_ TraceData<()>>)> {
134 self.kernel_source.data().chain(
135 self.user_source
136 .as_ref()
137 .map(|us| us.data())
138 .unwrap_or(TraceDataIter::empty()),
139 )
140 }
141}
142
143impl TraceSource {
144 fn collect(&mut self) -> miette::Result<ThreadSyncSleep> {
145 let mut current = self.objects.last().unwrap();
146 let posted_end = current.base().0.end.load(Ordering::SeqCst);
147 let start_point = self.end_point.max(current.base().0.start);
148 tracing::trace!(
149 "collect {:x}: {:x} {:x}: {}",
150 self.end_point,
151 posted_end,
152 start_point,
153 self.objects.len()
154 );
155 if self.end_point != posted_end {
156 let amount = posted_end.saturating_sub(start_point);
157
158 if amount > 0 {
159 self.total += amount;
160
161 let mut offset = 0usize;
163 while offset < amount as usize {
164 let header = current
165 .lea(start_point as usize + offset, size_of::<TraceEntryHead>())
166 .unwrap()
167 .cast::<TraceEntryHead>();
168 let header = unsafe { &*header };
169 if header.flags.contains(TraceEntryFlags::NEXT_OBJECT) {
170 tracing::debug!("got next tracing object: {}", header.extra_or_next);
171 let next = unsafe {
172 Object::<BaseWrap>::map_unchecked(header.extra_or_next, MapFlags::READ)
173 .into_diagnostic()
174 }?;
175 self.objects.push(next);
176 current = self.objects.last().unwrap();
177 self.end_point = current.base().0.start;
178 return self.collect();
179 } else {
180 offset += size_of::<TraceEntryHead>();
181 if header.flags.contains(TraceEntryFlags::HAS_DATA) {
182 let data_header = current
183 .lea(start_point as usize + offset, size_of::<TraceData<()>>())
184 .unwrap()
185 .cast::<TraceData<()>>();
186 offset += (unsafe { *data_header }).len as usize;
187 }
188 }
189 }
190 if offset == amount as usize {
191 self.end_point += amount;
192 }
193 }
194 }
195
196 Ok(ThreadSyncSleep::new(
197 ThreadSyncReference::Virtual(¤t.base().0.end),
198 start_point,
199 ThreadSyncOp::Equal,
200 ThreadSyncFlags::empty(),
201 ))
202 }
203
204 pub fn data(&self) -> TraceDataIter<'_> {
205 TraceDataIter {
206 state: Some(self),
207 pos: 0,
208 inner_pos: 0,
209 }
210 }
211}
212
213pub struct TraceDataIter<'a> {
214 state: Option<&'a TraceSource>,
215 pos: usize,
216 inner_pos: u64,
217}
218
219impl TraceDataIter<'_> {
220 pub fn empty() -> Self {
221 Self {
222 state: None,
223 pos: 0,
224 inner_pos: 0,
225 }
226 }
227}
228
229#[allow(dead_code)]
230struct Tracer {
231 state: Mutex<TracingState>,
232 specs: Vec<TraceSpec>,
233 state_cv: Condvar,
234 notifier: AtomicU64,
235}
236
237impl<'a> Iterator for TraceDataIter<'a> {
238 type Item = (&'a TraceEntryHead, Option<&'a TraceData<()>>);
239
240 fn next(&mut self) -> Option<Self::Item> {
241 if self.state.is_none() {
242 return None;
243 }
244 let obj = self.state.as_ref().unwrap().objects.get(self.pos)?;
245 let start_pos = self.inner_pos.max(obj.base().0.start);
246 self.inner_pos = start_pos;
247 let end = obj.base().0.end.load(Ordering::SeqCst);
248 if start_pos + size_of::<TraceEntryHead>() as u64 > end {
249 self.pos += 1;
250 self.inner_pos = 0;
251 return self.next();
252 }
253 let mut len = size_of::<TraceEntryHead>();
254 let header = obj
255 .lea(start_pos as usize, len)
256 .unwrap()
257 .cast::<TraceEntryHead>();
258 let header = unsafe { header.as_ref().unwrap() };
259 let data = if header.flags.contains(TraceEntryFlags::HAS_DATA) {
260 let data_header = obj
261 .lea(
262 start_pos as usize + size_of::<TraceEntryHead>(),
263 size_of::<TraceData<()>>(),
264 )
265 .unwrap()
266 .cast::<TraceData<()>>();
267 let data_header = unsafe { data_header.as_ref().unwrap() };
268 let data = obj
269 .lea(
270 start_pos as usize + size_of::<TraceEntryHead>(),
271 data_header.len as usize,
272 )
273 .unwrap()
274 .cast::<TraceData<()>>();
275 let data = unsafe { data.as_ref().unwrap() };
276 len += data.len as usize;
277 Some(data)
278 } else {
279 None
280 };
281
282 self.inner_pos += len as u64;
283
284 Some((header, data))
285 }
286}
287
288impl Tracer {
289 fn set_state(&self, new_state: State) {
290 tracing::trace!("setting tracing state: {:?}", new_state);
291 let mut guard = self.state.lock().unwrap();
292 guard.state = new_state;
293 self.state_cv.notify_all();
294 }
295
296 fn wait_for(&self, target_state: State) {
297 tracing::trace!("wait for tracing state: {:?}", target_state);
298 let mut guard = self.state.lock().unwrap();
299 while guard.state != target_state {
300 guard = self.state_cv.wait(guard).unwrap();
301 }
302 }
303
304 fn notify_exit(&self) {
305 let wake = ThreadSyncWake::new(ThreadSyncReference::Virtual(&self.notifier), usize::MAX);
306 self.notifier.store(1, Ordering::SeqCst);
307 let _ = sys_thread_sync(&mut [ThreadSync::new_wake(wake)], None).inspect_err(|e| {
308 tracing::warn!("failed to notify exit: {}", e);
309 });
310 }
311}
312
313fn collector(tracer: &Tracer) {
314 tracer.state.lock().unwrap().collector_id = sys_thread_self_id();
315 tracer.set_state(State::Ready);
316 let mut nr_wakes = 0;
317 loop {
318 let mut guard = tracer.state.lock().unwrap();
319 let Ok(waiter) = guard.collect().inspect_err(|e| {
320 tracing::error!("failed to collect trace data: {}", e);
321 }) else {
322 continue;
323 };
324
325 if tracer.notifier.load(Ordering::SeqCst) == 0 {
326 drop(guard);
327 let mut waiters = [
328 ThreadSync::new_sleep(waiter[0].unwrap()),
329 ThreadSync::new_sleep(ThreadSyncSleep::new(
330 ThreadSyncReference::Virtual(&tracer.notifier),
331 0,
332 ThreadSyncOp::Equal,
333 ThreadSyncFlags::empty(),
334 )),
335 ThreadSync::new_sleep(waiter[1].unwrap_or(ThreadSyncSleep::new(
336 ThreadSyncReference::Virtual(core::ptr::null()),
337 0,
338 ThreadSyncOp::Equal,
339 ThreadSyncFlags::empty(),
340 ))),
341 ];
342 let mut waiters = waiters.as_mut_slice();
343 tracing::trace!(
344 "collector is waiting for data: {} {} {}",
345 waiters[0].ready(),
346 waiters[1].ready(),
347 if waiter[1].is_some() {
348 if waiters[2].ready() { "true" } else { "false" }
349 } else {
350 "-"
351 }
352 );
353 if waiter[1].is_none() {
354 waiters = &mut waiters[0..2];
355 }
356 if waiters.iter().all(|w| !w.ready()) {
357 let _ = sys_thread_sync(waiters, None).inspect_err(|e| {
358 tracing::warn!("failed to thread sync: {}", e);
359 });
360 nr_wakes += 1;
361 }
362 } else {
363 tracing::trace!("collector was notified of exit");
364
365 let _ = sys_ktrace(guard.kernel_source.objects.first().unwrap().id(), None)
366 .inspect_err(|e| {
367 tracing::error!("failed to disable tracing: {}", e);
368 });
369 let _ = guard.collect().inspect_err(|e| {
370 tracing::error!("failed to collect trace data: {}", e);
371 });
372 if guard.state == State::Done {
373 break;
374 }
375 drop(tracer.state_cv.wait(guard).unwrap());
376 }
377 }
378 tracer.state.lock().unwrap().nr_wakes = nr_wakes;
379}
380
381pub fn start(
382 cli: &Cli,
383 comp: CompartmentHandle,
384 specs: Vec<TraceSpec>,
385 rt_trace: Option<Object<BaseWrap>>,
386) -> miette::Result<TracingState> {
387 let state = TracingState::new(comp.info().name, specs.as_slice(), rt_trace)?;
388
389 let tracer = Tracer {
390 state: Mutex::new(state),
391 specs,
392 state_cv: Condvar::new(),
393 notifier: AtomicU64::new(0),
394 };
395 std::thread::scope(|scope| {
396 let th_collector = Builder::new()
397 .name("trace-collector".to_owned())
398 .spawn_scoped(scope, || collector(&tracer))
399 .into_diagnostic()?;
400
401 tracer.wait_for(State::Ready);
402
403 let start = Instant::now();
404 for thread in comp.threads() {
405 let id: ObjID = thread.repr_id;
406 tracing::debug!("resuming compartment thread {}", id);
407 sys_thread_change_state(id, ExecutionState::Running).into_diagnostic()?;
408 if cli.prog.sample {
409 tracing::debug!("setting per-thread sampling for {}", id);
410 sys_thread_set_trace_events(id, PERTHREAD_TRACE_GEN_SAMPLE).into_diagnostic()?;
411 }
412 }
413 tracer.set_state(State::Running);
414
415 let mut flags = comp.info().flags;
416 while !flags.contains(CompartmentFlags::EXITED) {
417 flags = comp.wait(flags);
418 }
419 let end = Instant::now();
420 tracing::debug!(
421 "compartment exited after {:2.2}s",
422 (end - start).as_secs_f32()
423 );
424 tracer.state.lock().unwrap().start_time = start;
425 tracer.state.lock().unwrap().end_time = end;
426
427 tracer.set_state(State::Done);
428 tracer.notify_exit();
429
430 th_collector.join().unwrap();
431
432 std::io::Result::Ok(()).into_diagnostic()
433 })?;
434 tracer.state.into_inner().into_diagnostic()
435}