1use std::{
2 os::fd::FromRawFd,
3 process::{Command, Stdio},
4};
5
6use async_executor::LocalExecutor;
7use async_net::{TcpListener, TcpStream};
8use embedded_io_async::{ErrorType, Read, Write};
9use futures::{AsyncReadExt, AsyncWriteExt, FutureExt};
10use miette::{Context, IntoDiagnostic};
11use sunset::{ChanHandle, SignKey};
12use sunset_async::{ProgressHolder, SSHServer};
13use tracing::Level;
14use twizzler::object::{Object, RawObject};
15use twizzler_io::pty::{DEFAULT_TERMIOS, PtyBase, PtyServerHandle};
16use twizzler_rt_abi::{
17 fd::{RawFd, twz_rt_fd_close},
18 object::ObjectCreate,
19};
20
21fn main() {
22 tracing::subscriber::set_global_default(
23 tracing_subscriber::fmt()
24 .with_max_level(Level::INFO)
25 .without_time()
26 .compact()
27 .finish(),
28 )
29 .unwrap();
30 let listener = async_io::block_on(async { TcpListener::bind("0.0.0.0:5555").await.unwrap() });
33
34 tracing::info!("ready for incomming connections");
35 for _ in 0..4 {
36 let listener = listener.clone();
37 std::thread::spawn(move || {
38 let ex = LocalExecutor::new();
39 async_io::block_on(ex.run(async { accept(&listener).await }));
40 });
41 }
42 let ex = LocalExecutor::new();
43 async_io::block_on(ex.run(async { accept(&listener).await }));
44}
45
46async fn accept(listener: &TcpListener) {
47 while let Ok(conn) = listener.accept().await {
48 tracing::info!("accepting connection from {}", conn.1);
49 match sunset_server(conn.0).await {
50 Ok(_) => {
51 tracing::info!("closed connection to {}", conn.1);
52 }
53 Err(e) => {
54 tracing::error!("error in connection to {}: {}", conn.1, e);
55 }
56 }
57 }
58}
59
60struct Reader {
61 sock: TcpStream,
62}
63
64impl ErrorType for Reader {
65 type Error = std::io::Error;
66}
67
68impl Read for Reader {
69 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
70 self.sock.read(buf).await
71 }
72}
73
74struct Writer {
75 sock: TcpStream,
76}
77
78impl Write for Writer {
79 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
80 self.sock.write(buf).await
81 }
82
83 async fn flush(&mut self) -> Result<(), Self::Error> {
84 self.sock.flush().await
85 }
86}
87
88impl ErrorType for Writer {
89 type Error = std::io::Error;
90}
91
92async fn sunset_server(conn: TcpStream) -> miette::Result<()> {
93 let mut ssh_rxbuf = Box::new([0; 4096]);
94 let mut ssh_txbuf = Box::new([0; 4096]);
95 let serv = SSHServer::new(&mut *ssh_rxbuf, &mut *ssh_txbuf);
96
97 let mut rsock = Reader { sock: conn.clone() };
98 let mut wsock = Writer { sock: conn.clone() };
99
100 let out = {
101 let (send, recv) = async_channel::bounded(1);
102
103 let runner = async {
104 serv.run(&mut rsock, &mut wsock)
105 .await
106 .into_diagnostic()
107 .with_context(|| "server-run")
108 }
109 .fuse();
110 futures::pin_mut!(runner);
111 let session = session(&serv, send).fuse();
112 futures::pin_mut!(session);
113 let shell = shell(&serv, recv).fuse();
114 futures::pin_mut!(shell);
115
116 let out = futures::select! {
117 out = runner => out,
118 out = session => out,
119 out = shell => out,
120 }?;
121 conn.shutdown(std::net::Shutdown::Read)
122 .into_diagnostic()
123 .with_context(|| "from shutdown")?;
124
125 runner.await?;
126 out
127 };
128 drop(wsock);
129 drop(rsock);
130
131 Ok(out)
132}
133
134fn setup_pty() -> (RawFd, Object<PtyBase>) {
135 let pty =
136 twizzler_io::pty::PtyBase::create_object(ObjectCreate::default(), DEFAULT_TERMIOS).unwrap();
137 let client_fd = twizzler_rt_abi::fd::twz_rt_fd_open_pty_client(pty.id().raw(), 0).unwrap();
138
139 (client_fd, pty)
140}
141
142struct SessionCtx {
143 chan_handle: ChanHandle,
144 command: Option<String>,
145 username: Option<String>,
146 env: Vec<(String, String)>,
147}
148
149async fn shell(
150 serv: &SSHServer<'_>,
151 ch_ch: async_channel::Receiver<SessionCtx>,
152) -> miette::Result<()> {
153 let ctx = ch_ch.recv().await.into_diagnostic()?;
154 let (stdio, _stderr) = serv.stdio_stderr(ctx.chan_handle).await.into_diagnostic()?;
155 let (mut stdin, mut stdout) = stdio.split();
156
157 let mut cmd = Command::new("/initrd/shell");
158
159 if let Some(command) = ctx.command.as_ref() {
160 cmd.arg("-c");
161 cmd.arg(command);
162 }
163
164 cmd.envs(ctx.env.into_iter());
165
166 let (client_fd, pty) = setup_pty();
167
168 unsafe {
169 cmd.stdin(Stdio::from_raw_fd(client_fd));
170 cmd.stdout(Stdio::from_raw_fd(client_fd));
171 cmd.stderr(Stdio::from_raw_fd(client_fd));
172 }
173
174 let netreader = async {
175 let mut server = PtyServerHandle::new(pty.id(), None).unwrap();
176 loop {
177 let mut buf = [0; 1024];
178 let count = stdin.read(&mut buf).await.unwrap();
179 let (_, s) = blocking::unblock(move || {
180 (
181 <PtyServerHandle as std::io::Write>::write_all(&mut server, &buf[0..count])
182 .unwrap(),
183 server,
184 )
185 })
186 .await;
187 server = s;
188 }
189 }
190 .fuse();
191
192 let netwriter = async {
193 let mut server = PtyServerHandle::new(pty.id(), None).unwrap();
194 loop {
195 let mut buf = [0; 1024];
196 let (count, buf, s) = blocking::unblock(move || {
197 (
198 <PtyServerHandle as std::io::Read>::read(&mut server, &mut buf).unwrap(),
199 buf,
200 server,
201 )
202 })
203 .await;
204 server = s;
205 stdout.write_all(&buf[0..count]).await.unwrap();
206 stdout.flush().await.unwrap();
207 }
208 }
209 .fuse();
210
211 tracing::debug!(
212 "spawning {} {:?} for {:?} (cfd = {})",
213 cmd.get_program().display(),
214 cmd.get_args(),
215 ctx.username,
216 client_fd
217 );
218 let mut handle = blocking::unblock(move || cmd.spawn())
219 .await
220 .into_diagnostic()?;
221
222 twz_rt_fd_close(client_fd);
223 let handle = blocking::unblock(move || {
224 let _ = handle.wait();
225 })
226 .fuse();
227
228 futures::pin_mut!(handle);
229 futures::pin_mut!(netreader);
230 futures::pin_mut!(netwriter);
231
232 futures::select! {
233 _ = netreader => (),
234 _ = netwriter => (),
235 _ = handle => (),
236 };
237
238 tracing::debug!("shell exited");
239 pty.handle()
240 .cmd(
241 twizzler_rt_abi::object::ObjectCmd::Delete,
242 core::ptr::null_mut::<()>(),
243 )
244 .unwrap();
245
246 Ok(())
247}
248
249async fn session(
250 serv: &SSHServer<'_>,
251 sender: async_channel::Sender<SessionCtx>,
252) -> miette::Result<()> {
253 let mut chan_handle = None;
254 let mut username = None;
255 let mut env = Vec::new();
256 loop {
257 let mut ph = ProgressHolder::new();
258 let event = serv.progress(&mut ph).await.into_diagnostic()?;
259 match event {
260 sunset::ServEvent::Hostkeys(serv_hostkeys) => {
261 let key = SignKey::generate(sunset::KeyType::Ed25519, None).into_diagnostic()?;
262 serv_hostkeys.hostkeys(&[&key]).into_diagnostic()?;
263 }
264 sunset::ServEvent::FirstAuth(serv_first_auth) => {
265 let name = serv_first_auth.username().into_diagnostic()?;
266 tracing::debug!("logging in as {}", name);
267 username = Some(name.to_string());
268 serv_first_auth.allow().into_diagnostic()?;
269 }
270 sunset::ServEvent::OpenSession(serv_open_session) => {
273 if chan_handle.is_some() {
274 serv_open_session
275 .reject(sunset::ChanFail::SSH_OPEN_ADMINISTRATIVELY_PROHIBITED)
276 .into_diagnostic()?;
277 } else {
278 let ch = serv_open_session.accept().into_diagnostic()?;
279 tracing::debug!("opened session, channel = {}", ch.num());
280 chan_handle = Some(ch);
281 }
282 }
283 sunset::ServEvent::SessionShell(serv_shell_request) => {
284 tracing::debug!("shell start on channel {}", serv_shell_request.channel());
285 if let Some(ch) = chan_handle.take() {
286 serv_shell_request.succeed().into_diagnostic()?;
287 sender
288 .send(SessionCtx {
289 chan_handle: ch,
290 command: None,
291 username: username.clone(),
292 env: env.clone(),
293 })
294 .await
295 .into_diagnostic()?;
296 } else {
297 serv_shell_request.fail().into_diagnostic()?;
298 }
299 }
300 sunset::ServEvent::SessionExec(serv_exec_request) => {
301 tracing::debug!(
302 "session exec on channel {}: {}",
303 serv_exec_request.channel(),
304 serv_exec_request.command().into_diagnostic()?
305 );
306 if let Some(ch) = chan_handle.take() {
307 let command = serv_exec_request.command().into_diagnostic()?.to_string();
308 serv_exec_request.succeed().into_diagnostic()?;
309 sender
310 .send(SessionCtx {
311 chan_handle: ch,
312 command: Some(command),
313 username: username.clone(),
314 env: env.clone(),
315 })
316 .await
317 .into_diagnostic()?;
318 } else {
319 serv_exec_request.fail().into_diagnostic()?;
320 }
321 }
322 sunset::ServEvent::SessionPty(serv_pty_request) => {
323 let ch = serv_pty_request.channel();
324 tracing::debug!("pty request on channel {}", ch);
325 serv_pty_request.succeed().into_diagnostic()?;
326 }
327 sunset::ServEvent::SessionEnv(serv_environment_request) => {
328 let name = serv_environment_request.name().into_diagnostic()?;
329 let value = serv_environment_request.value().into_diagnostic()?;
330 let ch = serv_environment_request.channel();
331 tracing::debug!("env request on channel {}: {}={}", ch, name, value);
332 env.push((name.to_string(), value.to_string()));
333 serv_environment_request.succeed().into_diagnostic()?;
334 }
335 sunset::ServEvent::PollAgain => {}
336 sunset::ServEvent::Defunct => {
337 tracing::debug!("server defunct");
338 return Ok(());
339 }
340 _ => {
341 tracing::warn!("unknown event: {:?}", event);
342 }
343 }
344 }
345}