monitor/mon/thread/
cleaner.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomPinned,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicU64, Ordering},
7        mpsc::{Receiver, Sender},
8        Arc,
9    },
10};
11
12use twizzler_abi::syscall::{
13    sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
14    ThreadSyncSleep, ThreadSyncWake,
15};
16use twizzler_rt_abi::object::ObjID;
17
18use super::ManagedThread;
19use crate::mon::get_monitor;
20
21/// Tracks threads that do not exit cleanly, so their monitor-internal resources can be cleaned up.
22pub(crate) struct ThreadCleaner {
23    _thread: std::thread::JoinHandle<()>,
24    send: Sender<WaitOp>,
25    inner: Pin<Arc<ThreadCleanerData>>,
26}
27
28#[derive(Default)]
29struct ThreadCleanerData {
30    notify: AtomicU64,
31    _unpin: PhantomPinned,
32}
33
34// All the threads we are tracking.
35#[derive(Default)]
36struct Waits {
37    threads: HashMap<ObjID, ManagedThread>,
38}
39
40// Changes to the collection of threads we are tracking
41enum WaitOp {
42    Add(ManagedThread),
43    Remove(ObjID),
44}
45
46impl ThreadCleaner {
47    /// Makes a new ThreadCleaner.
48    pub(crate) fn new() -> Self {
49        let (send, recv) = std::sync::mpsc::channel();
50        let data = Arc::pin(ThreadCleanerData::default());
51        let inner = data.clone();
52        let thread = std::thread::Builder::new()
53            .name("thread-exit cleanup tracker".into())
54            .spawn(move || cleaner_thread_main(data, recv))
55            .unwrap();
56        Self {
57            send,
58            inner,
59            _thread: thread,
60        }
61    }
62
63    /// Track a thread. If that thread exits, the cleanup thread will remove the exited thread from
64    /// tracking and from the global thread manager.
65    pub fn track(&self, th: ManagedThread) {
66        tracing::debug!("tracking thread {}", th.id);
67        let _ = self.send.send(WaitOp::Add(th));
68        self.inner.notify();
69    }
70
71    /// Untrack a thread. Threads removed this way do not trigger a removal from the global thread
72    /// manager.
73    pub fn untrack(&self, id: ObjID) {
74        let _ = self.send.send(WaitOp::Remove(id));
75        self.inner.notify();
76    }
77}
78
79impl ThreadCleanerData {
80    /// Notify the cleanup thread that new items are on the queue.
81    fn notify(&self) {
82        self.notify.store(1, Ordering::SeqCst);
83        let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new(
84            ThreadSyncReference::Virtual(&self.notify),
85            1,
86        ))];
87        if let Err(e) = sys_thread_sync(&mut ops, None) {
88            tracing::warn!("thread sync error when trying to notify: {}", e);
89        }
90    }
91}
92
93impl Waits {
94    fn process_queue(&mut self, recv: &mut Receiver<WaitOp>) {
95        while let Ok(wo) = recv.try_recv() {
96            match wo {
97                WaitOp::Add(th) => {
98                    self.threads.insert(th.id, th);
99                }
100                WaitOp::Remove(id) => {
101                    self.threads.remove(&id);
102                }
103            }
104        }
105    }
106}
107
108fn cleaner_thread_main(data: Pin<Arc<ThreadCleanerData>>, mut recv: Receiver<WaitOp>) {
109    // TODO (dbittman): when we have support for async thread events, we can use that API.
110    let mut ops = Vec::new();
111    let mut cleanups = Vec::new();
112    let mut waits = Waits::default();
113    loop {
114        ops.truncate(0);
115        // Apply any waiting operations.
116        waits.process_queue(&mut recv);
117
118        // Add the notify sleep op.
119        ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new(
120            ThreadSyncReference::Virtual(&data.notify),
121            0,
122            ThreadSyncOp::Equal,
123            ThreadSyncFlags::empty(),
124        )));
125
126        // Add all sleep ops for threads.
127        cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited()));
128        for th in waits.threads.values() {
129            ops.push(ThreadSync::new_sleep(th.waitable_until_exit()));
130        }
131
132        // Remove any exited threads from the thread manager.
133        for (_, th) in cleanups.drain(..) {
134            tracing::debug!("cleaning thread: {}", th.id);
135            let monitor = get_monitor();
136            {
137                let key = happylock::ThreadKey::get().unwrap();
138                let mut tmgr = monitor.thread_mgr.write(key);
139                tmgr.do_remove(&th);
140            }
141            let comps = {
142                let key = happylock::ThreadKey::get().unwrap();
143                let (_, ref mut cmgr, ref mut dynlink, _, _) = *monitor.locks.lock(key);
144                for comp in cmgr.compartments_mut() {
145                    comp.clean_per_thread_data(th.id);
146                }
147                if let Some(comp_id) = th.main_thread_comp {
148                    cmgr.main_thread_exited(comp_id);
149                }
150                cmgr.process_cleanup_queue(&mut *dynlink)
151            };
152            drop(comps);
153        }
154
155        // Check for notifications, and sleep.
156        if data.notify.swap(0, Ordering::SeqCst) == 0 {
157            // no notification, go to sleep. hold the lock over the sleep so that someone cannot
158            // modify waits.threads on us while we're asleep.
159            if let Err(e) = sys_thread_sync(&mut ops, None) {
160                tracing::warn!("thread sync error: {}", e);
161            }
162        }
163    }
164}