monitor/mon/thread/
cleaner.rs1use std::{
2 collections::HashMap,
3 marker::PhantomPinned,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicU64, Ordering},
7 mpsc::{Receiver, Sender},
8 Arc,
9 },
10 time::Duration,
11};
12
13use secgate::TwzError;
14use twizzler_abi::syscall::{
15 sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
16 ThreadSyncSleep, ThreadSyncWake,
17};
18use twizzler_rt_abi::object::ObjID;
19
20use super::ManagedThread;
21use crate::mon::get_monitor;
22
23pub(crate) struct ThreadCleaner {
25 _thread: std::thread::JoinHandle<()>,
26 send: Sender<WaitOp>,
27 inner: Pin<Arc<ThreadCleanerData>>,
28}
29
30#[derive(Default)]
31struct ThreadCleanerData {
32 notify: AtomicU64,
33 _unpin: PhantomPinned,
34}
35
36#[derive(Default)]
38struct Waits {
39 threads: HashMap<ObjID, ManagedThread>,
40}
41
42enum WaitOp {
44 Add(ManagedThread),
45 Remove(ObjID),
46}
47
48impl ThreadCleaner {
49 pub(crate) fn new() -> Self {
51 let (send, recv) = std::sync::mpsc::channel();
52 let data = Arc::pin(ThreadCleanerData::default());
53 let inner = data.clone();
54 let thread = std::thread::Builder::new()
55 .name("thread-exit cleanup tracker".into())
56 .spawn(move || cleaner_thread_main(data, recv))
57 .unwrap();
58 Self {
59 send,
60 inner,
61 _thread: thread,
62 }
63 }
64
65 pub fn track(&self, th: ManagedThread) {
68 tracing::debug!("tracking thread {}", th.id);
69 let _ = self.send.send(WaitOp::Add(th));
70 self.inner.notify();
71 }
72
73 pub fn untrack(&self, id: ObjID) {
76 let _ = self.send.send(WaitOp::Remove(id));
77 self.inner.notify();
78 }
79}
80
81impl ThreadCleanerData {
82 fn notify(&self) {
84 self.notify.store(1, Ordering::SeqCst);
85 let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new(
86 ThreadSyncReference::Virtual(&self.notify),
87 1,
88 ))];
89 if let Err(e) = sys_thread_sync(&mut ops, None) {
90 tracing::warn!("thread sync error when trying to notify: {}", e);
91 }
92 }
93}
94
95impl Waits {
96 fn process_queue(&mut self, recv: &mut Receiver<WaitOp>) -> bool {
97 let mut did_work = false;
98 while let Ok(wo) = recv.try_recv() {
99 did_work = true;
100 match wo {
101 WaitOp::Add(th) => {
102 self.threads.insert(th.id, th);
103 }
104 WaitOp::Remove(id) => {
105 self.threads.remove(&id);
106 }
107 }
108 }
109 did_work
110 }
111}
112
113fn cleaner_thread_main(data: Pin<Arc<ThreadCleanerData>>, mut recv: Receiver<WaitOp>) {
114 let mut ops = Vec::new();
116 let mut cleanups = Vec::new();
117 let mut waits = Waits::default();
118 loop {
119 ops.truncate(0);
120 let mut did_work = waits.process_queue(&mut recv);
122
123 ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new(
125 ThreadSyncReference::Virtual(&data.notify),
126 0,
127 ThreadSyncOp::Equal,
128 ThreadSyncFlags::empty(),
129 )));
130
131 cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited()));
133
134 if !cleanups.is_empty() {
135 did_work = true;
136 }
137 for (_, th) in cleanups.drain(..) {
139 tracing::debug!("cleaning thread: {}", th.id);
140 let monitor = get_monitor();
141 {
142 let key = happylock::ThreadKey::get().unwrap();
143 let mut tmgr = monitor.thread_mgr.write(key);
144 tmgr.do_remove(&th);
145 }
146 let comps = {
147 let key = happylock::ThreadKey::get().unwrap();
148 let (_, ref mut cmgr, ref mut dynlink, _, _) = *monitor.locks.lock(key);
149 for comp in cmgr.compartments_mut() {
150 comp.clean_per_thread_data(th.id);
151 }
152 if let Some(comp_id) = th.main_thread_comp {
153 cmgr.main_thread_exited(comp_id);
154 }
155 cmgr.process_cleanup_queue(&mut *dynlink)
156 };
157 drop(comps);
158 }
159
160 for th in waits.threads.values() {
161 ops.push(ThreadSync::new_sleep(th.waitable_until_exit()));
162 }
163 did_work |= waits.threads.values().any(|v| v.has_exited());
164
165 if !did_work && data.notify.swap(0, Ordering::SeqCst) == 0 {
167 if let Err(e) = sys_thread_sync(&mut ops, Some(Duration::from_secs(8))) {
170 if e != TwzError::TIMED_OUT {
171 tracing::warn!("thread sync error: {}", e);
172 }
173 }
174 }
175 }
176}