sshd/
main.rs

1use std::{
2    os::fd::FromRawFd,
3    process::{Command, Stdio},
4};
5
6use async_executor::LocalExecutor;
7use async_net::{TcpListener, TcpStream};
8use embedded_io_async::{ErrorType, Read, Write};
9use futures::{AsyncReadExt, AsyncWriteExt, FutureExt};
10use miette::{Context, IntoDiagnostic};
11use sunset::{ChanHandle, SignKey};
12use sunset_async::{ProgressHolder, SSHServer};
13use tracing::Level;
14use twizzler::object::{Object, RawObject};
15use twizzler_io::pty::{DEFAULT_TERMIOS, PtyBase, PtyServerHandle};
16use twizzler_rt_abi::{
17    fd::{RawFd, twz_rt_fd_close},
18    object::ObjectCreate,
19};
20
21fn main() {
22    tracing::subscriber::set_global_default(
23        tracing_subscriber::fmt()
24            .with_max_level(Level::INFO)
25            .without_time()
26            .compact()
27            .finish(),
28    )
29    .unwrap();
30    //tracing_log::LogTracer::init().unwrap();
31
32    let listener = async_io::block_on(async { TcpListener::bind("0.0.0.0:5555").await.unwrap() });
33
34    tracing::info!("ready for incomming connections");
35    for _ in 0..4 {
36        let listener = listener.clone();
37        std::thread::spawn(move || {
38            let ex = LocalExecutor::new();
39            async_io::block_on(ex.run(async { accept(&listener).await }));
40        });
41    }
42    let ex = LocalExecutor::new();
43    async_io::block_on(ex.run(async { accept(&listener).await }));
44}
45
46async fn accept(listener: &TcpListener) {
47    while let Ok(conn) = listener.accept().await {
48        tracing::info!("accepting connection from {}", conn.1);
49        match sunset_server(conn.0).await {
50            Ok(_) => {
51                tracing::info!("closed connection to {}", conn.1);
52            }
53            Err(e) => {
54                tracing::error!("error in connection to {}: {}", conn.1, e);
55            }
56        }
57    }
58}
59
60struct Reader {
61    sock: TcpStream,
62}
63
64impl ErrorType for Reader {
65    type Error = std::io::Error;
66}
67
68impl Read for Reader {
69    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
70        self.sock.read(buf).await
71    }
72}
73
74struct Writer {
75    sock: TcpStream,
76}
77
78impl Write for Writer {
79    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
80        self.sock.write(buf).await
81    }
82
83    async fn flush(&mut self) -> Result<(), Self::Error> {
84        self.sock.flush().await
85    }
86}
87
88impl ErrorType for Writer {
89    type Error = std::io::Error;
90}
91
92async fn sunset_server(conn: TcpStream) -> miette::Result<()> {
93    let mut ssh_rxbuf = Box::new([0; 4096]);
94    let mut ssh_txbuf = Box::new([0; 4096]);
95    let serv = SSHServer::new(&mut *ssh_rxbuf, &mut *ssh_txbuf);
96
97    let mut rsock = Reader { sock: conn.clone() };
98    let mut wsock = Writer { sock: conn.clone() };
99
100    let out = {
101        let (send, recv) = async_channel::bounded(1);
102
103        let runner = async {
104            serv.run(&mut rsock, &mut wsock)
105                .await
106                .into_diagnostic()
107                .with_context(|| "server-run")
108        }
109        .fuse();
110        futures::pin_mut!(runner);
111        let session = session(&serv, send).fuse();
112        futures::pin_mut!(session);
113        let shell = shell(&serv, recv).fuse();
114        futures::pin_mut!(shell);
115
116        let out = futures::select! {
117            out = runner => out,
118            out = session => out,
119            out = shell => out,
120        }?;
121        conn.shutdown(std::net::Shutdown::Read)
122            .into_diagnostic()
123            .with_context(|| "from shutdown")?;
124
125        runner.await?;
126        out
127    };
128    drop(wsock);
129    drop(rsock);
130
131    Ok(out)
132}
133
134fn setup_pty() -> (RawFd, Object<PtyBase>) {
135    let pty =
136        twizzler_io::pty::PtyBase::create_object(ObjectCreate::default(), DEFAULT_TERMIOS).unwrap();
137    let client_fd = twizzler_rt_abi::fd::twz_rt_fd_open_pty_client(pty.id().raw(), 0).unwrap();
138
139    (client_fd, pty)
140}
141
142struct SessionCtx {
143    chan_handle: ChanHandle,
144    command: Option<String>,
145    username: Option<String>,
146    env: Vec<(String, String)>,
147}
148
149async fn shell(
150    serv: &SSHServer<'_>,
151    ch_ch: async_channel::Receiver<SessionCtx>,
152) -> miette::Result<()> {
153    let ctx = ch_ch.recv().await.into_diagnostic()?;
154    let (stdio, _stderr) = serv.stdio_stderr(ctx.chan_handle).await.into_diagnostic()?;
155    let (mut stdin, mut stdout) = stdio.split();
156
157    let mut cmd = Command::new("/initrd/shell");
158
159    if let Some(command) = ctx.command.as_ref() {
160        cmd.arg("-c");
161        cmd.arg(command);
162    }
163
164    cmd.envs(ctx.env.into_iter());
165
166    let (client_fd, pty) = setup_pty();
167
168    unsafe {
169        cmd.stdin(Stdio::from_raw_fd(client_fd));
170        cmd.stdout(Stdio::from_raw_fd(client_fd));
171        cmd.stderr(Stdio::from_raw_fd(client_fd));
172    }
173
174    let netreader = async {
175        let mut server = PtyServerHandle::new(pty.id(), None).unwrap();
176        loop {
177            let mut buf = [0; 1024];
178            let count = stdin.read(&mut buf).await.unwrap();
179            let (_, s) = blocking::unblock(move || {
180                (
181                    <PtyServerHandle as std::io::Write>::write_all(&mut server, &buf[0..count])
182                        .unwrap(),
183                    server,
184                )
185            })
186            .await;
187            server = s;
188        }
189    }
190    .fuse();
191
192    let netwriter = async {
193        let mut server = PtyServerHandle::new(pty.id(), None).unwrap();
194        loop {
195            let mut buf = [0; 1024];
196            let (count, buf, s) = blocking::unblock(move || {
197                (
198                    <PtyServerHandle as std::io::Read>::read(&mut server, &mut buf).unwrap(),
199                    buf,
200                    server,
201                )
202            })
203            .await;
204            server = s;
205            stdout.write_all(&buf[0..count]).await.unwrap();
206            stdout.flush().await.unwrap();
207        }
208    }
209    .fuse();
210
211    tracing::debug!(
212        "spawning {} {:?} for {:?} (cfd = {})",
213        cmd.get_program().display(),
214        cmd.get_args(),
215        ctx.username,
216        client_fd
217    );
218    let mut handle = blocking::unblock(move || cmd.spawn())
219        .await
220        .into_diagnostic()?;
221
222    twz_rt_fd_close(client_fd);
223    let handle = blocking::unblock(move || {
224        let _ = handle.wait();
225    })
226    .fuse();
227
228    futures::pin_mut!(handle);
229    futures::pin_mut!(netreader);
230    futures::pin_mut!(netwriter);
231
232    futures::select! {
233        _ = netreader => (),
234        _ = netwriter => (),
235        _ = handle => (),
236    };
237
238    tracing::debug!("shell exited");
239    pty.handle()
240        .cmd(
241            twizzler_rt_abi::object::ObjectCmd::Delete,
242            core::ptr::null_mut::<()>(),
243        )
244        .unwrap();
245
246    Ok(())
247}
248
249async fn session(
250    serv: &SSHServer<'_>,
251    sender: async_channel::Sender<SessionCtx>,
252) -> miette::Result<()> {
253    let mut chan_handle = None;
254    let mut username = None;
255    let mut env = Vec::new();
256    loop {
257        let mut ph = ProgressHolder::new();
258        let event = serv.progress(&mut ph).await.into_diagnostic()?;
259        match event {
260            sunset::ServEvent::Hostkeys(serv_hostkeys) => {
261                let key = SignKey::generate(sunset::KeyType::Ed25519, None).into_diagnostic()?;
262                serv_hostkeys.hostkeys(&[&key]).into_diagnostic()?;
263            }
264            sunset::ServEvent::FirstAuth(serv_first_auth) => {
265                let name = serv_first_auth.username().into_diagnostic()?;
266                tracing::debug!("logging in as {}", name);
267                username = Some(name.to_string());
268                serv_first_auth.allow().into_diagnostic()?;
269            }
270            //sunset::ServEvent::PasswordAuth(serv_password_auth) => todo!(),
271            //sunset::ServEvent::PubkeyAuth(serv_pubkey_auth) => todo!(),
272            sunset::ServEvent::OpenSession(serv_open_session) => {
273                if chan_handle.is_some() {
274                    serv_open_session
275                        .reject(sunset::ChanFail::SSH_OPEN_ADMINISTRATIVELY_PROHIBITED)
276                        .into_diagnostic()?;
277                } else {
278                    let ch = serv_open_session.accept().into_diagnostic()?;
279                    tracing::debug!("opened session, channel = {}", ch.num());
280                    chan_handle = Some(ch);
281                }
282            }
283            sunset::ServEvent::SessionShell(serv_shell_request) => {
284                tracing::debug!("shell start on channel {}", serv_shell_request.channel());
285                if let Some(ch) = chan_handle.take() {
286                    serv_shell_request.succeed().into_diagnostic()?;
287                    sender
288                        .send(SessionCtx {
289                            chan_handle: ch,
290                            command: None,
291                            username: username.clone(),
292                            env: env.clone(),
293                        })
294                        .await
295                        .into_diagnostic()?;
296                } else {
297                    serv_shell_request.fail().into_diagnostic()?;
298                }
299            }
300            sunset::ServEvent::SessionExec(serv_exec_request) => {
301                tracing::debug!(
302                    "session exec on channel {}: {}",
303                    serv_exec_request.channel(),
304                    serv_exec_request.command().into_diagnostic()?
305                );
306                if let Some(ch) = chan_handle.take() {
307                    let command = serv_exec_request.command().into_diagnostic()?.to_string();
308                    serv_exec_request.succeed().into_diagnostic()?;
309                    sender
310                        .send(SessionCtx {
311                            chan_handle: ch,
312                            command: Some(command),
313                            username: username.clone(),
314                            env: env.clone(),
315                        })
316                        .await
317                        .into_diagnostic()?;
318                } else {
319                    serv_exec_request.fail().into_diagnostic()?;
320                }
321            }
322            sunset::ServEvent::SessionPty(serv_pty_request) => {
323                let ch = serv_pty_request.channel();
324                tracing::debug!("pty request on channel {}", ch);
325                serv_pty_request.succeed().into_diagnostic()?;
326            }
327            sunset::ServEvent::SessionEnv(serv_environment_request) => {
328                let name = serv_environment_request.name().into_diagnostic()?;
329                let value = serv_environment_request.value().into_diagnostic()?;
330                let ch = serv_environment_request.channel();
331                tracing::debug!("env request on channel {}: {}={}", ch, name, value);
332                env.push((name.to_string(), value.to_string()));
333                serv_environment_request.succeed().into_diagnostic()?;
334            }
335            sunset::ServEvent::PollAgain => {}
336            sunset::ServEvent::Defunct => {
337                tracing::debug!("server defunct");
338                return Ok(());
339            }
340            _ => {
341                tracing::warn!("unknown event: {:?}", event);
342            }
343        }
344    }
345}