monitor/mon/thread/
cleaner.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomPinned,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicU64, Ordering},
7        mpsc::{Receiver, Sender},
8        Arc,
9    },
10    time::Duration,
11};
12
13use secgate::TwzError;
14use twizzler_abi::syscall::{
15    sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
16    ThreadSyncSleep, ThreadSyncWake,
17};
18use twizzler_rt_abi::object::ObjID;
19
20use super::ManagedThread;
21use crate::mon::get_monitor;
22
23/// Tracks threads that do not exit cleanly, so their monitor-internal resources can be cleaned up.
24pub(crate) struct ThreadCleaner {
25    _thread: std::thread::JoinHandle<()>,
26    send: Sender<WaitOp>,
27    inner: Pin<Arc<ThreadCleanerData>>,
28}
29
30#[derive(Default)]
31struct ThreadCleanerData {
32    notify: AtomicU64,
33    _unpin: PhantomPinned,
34}
35
36// All the threads we are tracking.
37#[derive(Default)]
38struct Waits {
39    threads: HashMap<ObjID, ManagedThread>,
40}
41
42// Changes to the collection of threads we are tracking
43enum WaitOp {
44    Add(ManagedThread),
45    Remove(ObjID),
46}
47
48impl ThreadCleaner {
49    /// Makes a new ThreadCleaner.
50    pub(crate) fn new() -> Self {
51        let (send, recv) = std::sync::mpsc::channel();
52        let data = Arc::pin(ThreadCleanerData::default());
53        let inner = data.clone();
54        let thread = std::thread::Builder::new()
55            .name("thread-exit cleanup tracker".into())
56            .spawn(move || cleaner_thread_main(data, recv))
57            .unwrap();
58        Self {
59            send,
60            inner,
61            _thread: thread,
62        }
63    }
64
65    /// Track a thread. If that thread exits, the cleanup thread will remove the exited thread from
66    /// tracking and from the global thread manager.
67    pub fn track(&self, th: ManagedThread) {
68        tracing::debug!("tracking thread {}", th.id);
69        let _ = self.send.send(WaitOp::Add(th));
70        self.inner.notify();
71    }
72
73    /// Untrack a thread. Threads removed this way do not trigger a removal from the global thread
74    /// manager.
75    pub fn untrack(&self, id: ObjID) {
76        let _ = self.send.send(WaitOp::Remove(id));
77        self.inner.notify();
78    }
79}
80
81impl ThreadCleanerData {
82    /// Notify the cleanup thread that new items are on the queue.
83    fn notify(&self) {
84        self.notify.store(1, Ordering::SeqCst);
85        let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new(
86            ThreadSyncReference::Virtual(&self.notify),
87            1,
88        ))];
89        if let Err(e) = sys_thread_sync(&mut ops, None) {
90            tracing::warn!("thread sync error when trying to notify: {}", e);
91        }
92    }
93}
94
95impl Waits {
96    fn process_queue(&mut self, recv: &mut Receiver<WaitOp>) -> bool {
97        let mut did_work = false;
98        while let Ok(wo) = recv.try_recv() {
99            did_work = true;
100            match wo {
101                WaitOp::Add(th) => {
102                    self.threads.insert(th.id, th);
103                }
104                WaitOp::Remove(id) => {
105                    self.threads.remove(&id);
106                }
107            }
108        }
109        did_work
110    }
111}
112
113fn cleaner_thread_main(data: Pin<Arc<ThreadCleanerData>>, mut recv: Receiver<WaitOp>) {
114    // TODO (dbittman): when we have support for async thread events, we can use that API.
115    let mut ops = Vec::new();
116    let mut cleanups = Vec::new();
117    let mut waits = Waits::default();
118    loop {
119        ops.truncate(0);
120        // Apply any waiting operations.
121        let mut did_work = waits.process_queue(&mut recv);
122
123        // Add the notify sleep op.
124        ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new(
125            ThreadSyncReference::Virtual(&data.notify),
126            0,
127            ThreadSyncOp::Equal,
128            ThreadSyncFlags::empty(),
129        )));
130
131        // Add all sleep ops for threads.
132        cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited()));
133
134        if !cleanups.is_empty() {
135            did_work = true;
136        }
137        // Remove any exited threads from the thread manager.
138        for (_, th) in cleanups.drain(..) {
139            tracing::debug!("cleaning thread: {}", th.id);
140            let monitor = get_monitor();
141            {
142                let key = happylock::ThreadKey::get().unwrap();
143                let mut tmgr = monitor.thread_mgr.write(key);
144                tmgr.do_remove(&th);
145            }
146            let comps = {
147                let key = happylock::ThreadKey::get().unwrap();
148                let (_, ref mut cmgr, ref mut dynlink, _, _) = *monitor.locks.lock(key);
149                for comp in cmgr.compartments_mut() {
150                    comp.clean_per_thread_data(th.id);
151                }
152                if let Some(comp_id) = th.main_thread_comp {
153                    cmgr.main_thread_exited(comp_id);
154                }
155                cmgr.process_cleanup_queue(&mut *dynlink)
156            };
157            drop(comps);
158        }
159
160        for th in waits.threads.values() {
161            ops.push(ThreadSync::new_sleep(th.waitable_until_exit()));
162        }
163        did_work |= waits.threads.values().any(|v| v.has_exited());
164
165        // Check for notifications, and sleep.
166        if !did_work && data.notify.swap(0, Ordering::SeqCst) == 0 {
167            // no notification, go to sleep. hold the lock over the sleep so that someone cannot
168            // modify waits.threads on us while we're asleep.
169            if let Err(e) = sys_thread_sync(&mut ops, Some(Duration::from_secs(8))) {
170                if e != TwzError::TIMED_OUT {
171                    tracing::warn!("thread sync error: {}", e);
172                }
173            }
174        }
175    }
176}