monitor/mon/thread/
cleaner.rs1use std::{
2 collections::HashMap,
3 marker::PhantomPinned,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicU64, Ordering},
7 mpsc::{Receiver, Sender},
8 Arc,
9 },
10};
11
12use twizzler_abi::syscall::{
13 sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
14 ThreadSyncSleep, ThreadSyncWake,
15};
16use twizzler_rt_abi::object::ObjID;
17
18use super::ManagedThread;
19use crate::mon::get_monitor;
20
21pub(crate) struct ThreadCleaner {
23 _thread: std::thread::JoinHandle<()>,
24 send: Sender<WaitOp>,
25 inner: Pin<Arc<ThreadCleanerData>>,
26}
27
28#[derive(Default)]
29struct ThreadCleanerData {
30 notify: AtomicU64,
31 _unpin: PhantomPinned,
32}
33
34#[derive(Default)]
36struct Waits {
37 threads: HashMap<ObjID, ManagedThread>,
38}
39
40enum WaitOp {
42 Add(ManagedThread),
43 Remove(ObjID),
44}
45
46impl ThreadCleaner {
47 pub(crate) fn new() -> Self {
49 let (send, recv) = std::sync::mpsc::channel();
50 let data = Arc::pin(ThreadCleanerData::default());
51 let inner = data.clone();
52 let thread = std::thread::Builder::new()
53 .name("thread-exit cleanup tracker".into())
54 .spawn(move || cleaner_thread_main(data, recv))
55 .unwrap();
56 Self {
57 send,
58 inner,
59 _thread: thread,
60 }
61 }
62
63 pub fn track(&self, th: ManagedThread) {
66 tracing::debug!("tracking thread {}", th.id);
67 let _ = self.send.send(WaitOp::Add(th));
68 self.inner.notify();
69 }
70
71 pub fn untrack(&self, id: ObjID) {
74 let _ = self.send.send(WaitOp::Remove(id));
75 self.inner.notify();
76 }
77}
78
79impl ThreadCleanerData {
80 fn notify(&self) {
82 self.notify.store(1, Ordering::SeqCst);
83 let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new(
84 ThreadSyncReference::Virtual(&self.notify),
85 1,
86 ))];
87 if let Err(e) = sys_thread_sync(&mut ops, None) {
88 tracing::warn!("thread sync error when trying to notify: {}", e);
89 }
90 }
91}
92
93impl Waits {
94 fn process_queue(&mut self, recv: &mut Receiver<WaitOp>) {
95 while let Ok(wo) = recv.try_recv() {
96 match wo {
97 WaitOp::Add(th) => {
98 self.threads.insert(th.id, th);
99 }
100 WaitOp::Remove(id) => {
101 self.threads.remove(&id);
102 }
103 }
104 }
105 }
106}
107
108fn cleaner_thread_main(data: Pin<Arc<ThreadCleanerData>>, mut recv: Receiver<WaitOp>) {
109 let mut ops = Vec::new();
111 let mut cleanups = Vec::new();
112 let mut waits = Waits::default();
113 loop {
114 ops.truncate(0);
115 waits.process_queue(&mut recv);
117
118 ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new(
120 ThreadSyncReference::Virtual(&data.notify),
121 0,
122 ThreadSyncOp::Equal,
123 ThreadSyncFlags::empty(),
124 )));
125
126 cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited()));
128 for th in waits.threads.values() {
129 ops.push(ThreadSync::new_sleep(th.waitable_until_exit()));
130 }
131
132 for (_, th) in cleanups.drain(..) {
134 tracing::debug!("cleaning thread: {}", th.id);
135 let monitor = get_monitor();
136 {
137 let key = happylock::ThreadKey::get().unwrap();
138 let mut tmgr = monitor.thread_mgr.write(key);
139 tmgr.do_remove(&th);
140 }
141 let comps = {
142 let key = happylock::ThreadKey::get().unwrap();
143 let (_, ref mut cmgr, ref mut dynlink, _, _) = *monitor.locks.lock(key);
144 for comp in cmgr.compartments_mut() {
145 comp.clean_per_thread_data(th.id);
146 }
147 if let Some(comp_id) = th.main_thread_comp {
148 cmgr.main_thread_exited(comp_id);
149 }
150 cmgr.process_cleanup_queue(&mut *dynlink)
151 };
152 drop(comps);
153 }
154
155 if data.notify.swap(0, Ordering::SeqCst) == 0 {
157 if let Err(e) = sys_thread_sync(&mut ops, None) {
160 tracing::warn!("thread sync error: {}", e);
161 }
162 }
163 }
164}