Buggy but mostly working std implementation

This commit is contained in:
Lucas Schumacher 2023-12-28 15:28:36 -05:00
commit 13610f3470
4 changed files with 150 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

7
Cargo.lock generated Normal file
View File

@ -0,0 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "kissdummy"
version = "0.1.0"

8
Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "kissdummy"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

134
src/main.rs Normal file
View File

@ -0,0 +1,134 @@
use std::io::Read;
use std::io::Write;
use std::net::TcpListener;
use std::net::TcpStream;
mod message_bus {
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
pub struct MsgSender(Sender<Msg>);
impl MsgSender {
pub fn send(&self, data: Vec<u8>) -> Result<(), SendError<Msg>> {
self.0.send(Msg::Data(data))
}
}
//pub struct MsgReceiver(Receiver<Vec<u8>>);
pub type MsgReceiver = Receiver<Vec<u8>>;
pub enum Msg {
Data(Vec<u8>),
Subscribe(Sender<Vec<u8>>),
}
pub struct MessageBus {
input: Sender<Msg>,
//outputs: Vec<Sender<Msg>>,
}
fn output_loop(main_rx: Receiver<Msg>) {
let mut outputs: Vec<Sender<Vec<u8>>> = vec![];
loop {
match main_rx.recv() {
Ok(Msg::Data(data)) => {
let mut i = 0;
while i < outputs.len() {
if outputs[i].send(data.clone()).is_err() {
outputs.swap_remove(i);
} else {
i += 1;
}
}
}
Ok(Msg::Subscribe(sender)) => {
outputs.push(sender);
}
Err(_e) => {
println!("Error bus died")
}
}
}
}
impl MessageBus {
pub fn new() -> Self {
let (input, input_rx) = channel();
std::thread::spawn(move || output_loop(input_rx));
Self { input, /*outputs*/ }
}
pub fn subscribe(&self) -> (MsgSender, Receiver<Vec<u8>>) {
let (tx, rx) = channel();
let input = self.input.clone();
input.send(Msg::Subscribe(tx)).unwrap();
(MsgSender(input), rx)
}
}
}
fn main() -> std::io::Result<()> {
let bus = message_bus::MessageBus::new();
/*
let (tx1, rx1) = bus.subscribe();
let (tx2, rx2) = bus.subscribe();
let payload = "Hello world!".to_string().as_bytes().to_vec();
tx1.send(payload.clone()).unwrap();
assert_eq!(rx2.recv().unwrap(), rx1.recv().unwrap());
println!("sleeping 5");
std::thread::sleep(std::time::Duration::from_secs(5));
println!("dropping rx2 and sleeping 5");
drop(rx2);
drop(tx2);
std::thread::sleep(std::time::Duration::from_secs(5));
println!("sending payload and sleeping 5");
tx1.send(payload.clone()).unwrap();
std::thread::sleep(std::time::Duration::from_secs(5));
println!("creating new rx2 and sleeping 5");
let (tx2, rx2) = bus.subscribe();
std::thread::sleep(std::time::Duration::from_secs(5));
println!("sending payload and sleeping 5");
tx2.send(payload.clone()).unwrap();
std::thread::sleep(std::time::Duration::from_secs(5));
*/
let listener = TcpListener::bind("127.0.0.1:8001")?;
for stream in listener.incoming() {
match stream {
Ok(tx_tcp) => {
let (tx_bus, rx_bus) = bus.subscribe();
let rx_tcp = tx_tcp.try_clone().unwrap();
std::thread::spawn(move || publish_thread(rx_tcp, tx_bus));
std::thread::spawn(move || consume_thread(tx_tcp, rx_bus));
}
Err(e) => {
println!("Error accepting connection: {}", e);
}
}
}
Ok(())
}
fn publish_thread(mut rx_tcp: TcpStream, tx_bus: message_bus::MsgSender) {
let mut buffer = vec![0_u8; 2048];
while let Ok(len) = rx_tcp.read(&mut buffer) {
let data = buffer[..len].to_vec();
if tx_bus.send(data).is_err() {
break;
}
}
}
fn consume_thread(mut tx_tcp: TcpStream, rx_bus: message_bus::MsgReceiver) {
while let Ok(data) = rx_bus.recv() {
if tx_tcp.write_all(&data).is_err() {
break;
}
}
}