net_srv/
client.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc, Mutex, OnceLock,
6    },
7    thread::JoinHandle,
8};
9
10use smoltcp::{
11    phy::{Device, RxToken},
12    time::Instant,
13    wire::{EthernetFrame, PrettyPrinter},
14};
15use twizzler_abi::syscall::{sys_thread_sync, ThreadSync};
16use twizzler_net::NetServer;
17use virtio_net::TxBuffer;
18
19use crate::NETINFO;
20
21pub struct Client {
22    pub ep: Mutex<NetServer>,
23    jh: OnceLock<JoinHandle<()>>,
24    pub active: AtomicBool,
25    pub ports: Mutex<HashMap<u16, usize>>,
26}
27
28impl Client {
29    pub fn new(ep: NetServer) -> Arc<Self> {
30        let client = Arc::new(Client {
31            ep: Mutex::new(ep),
32            jh: OnceLock::new(),
33            active: AtomicBool::new(true),
34            ports: Mutex::new(HashMap::new()),
35        });
36        let _client = client.clone();
37        let jh = std::thread::spawn(move || client_thread(_client));
38        client.jh.set(jh).unwrap();
39        client
40    }
41
42    fn active(&self) -> bool {
43        self.active.load(Ordering::SeqCst)
44    }
45}
46
47fn client_thread(client: Arc<Client>) {
48    let device = NETINFO.get().unwrap().device.clone();
49    let tx_po = client.ep.lock().unwrap().client_tx_packet_object().clone();
50    while client.active() {
51        let mut ep = client.ep.lock().unwrap();
52        while let Some((rx, _tx)) = ep.receive(Instant::now()) {
53            let packet = rx.packet;
54            rx.consume(|buf| {
55                if false {
56                    let f = EthernetFrame::new_unchecked(&mut *buf);
57                    let pp = PrettyPrinter::<EthernetFrame<&mut [u8]>>::print(&f);
58                    eprintln!("client thread got {}", pp);
59                }
60                let tx = TxBuffer::from_packet(tx_po.clone(), buf.len(), packet, false);
61                device.transmit(tx);
62
63                //if let Some(dtx) = device.transmit(Instant::now()) {
64                //    dtx.consume(buf.len(), |dbuf| dbuf.copy_from_slice(buf));
65                //  }
66            })
67        }
68
69        let rx_waiter = ep.rx_waiter();
70        if ep.has_pending_msg_from_client() {
71            continue;
72        }
73        drop(ep);
74
75        let _ = sys_thread_sync(&mut [ThreadSync::new_sleep(rx_waiter)], None);
76    }
77}