From 13610f3470f38cf6ee3e714dedb01b62b31a7ce7 Mon Sep 17 00:00:00 2001 From: Lucas Schumacher Date: Thu, 28 Dec 2023 15:28:36 -0500 Subject: [PATCH] Buggy but mostly working std implementation --- .gitignore | 1 + Cargo.lock | 7 +++ Cargo.toml | 8 ++++ src/main.rs | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..e537e9d --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "kissdummy" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..09098fe --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "kissdummy" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..522dd90 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,134 @@ +use std::io::Read; +use std::io::Write; +use std::net::TcpListener; +use std::net::TcpStream; + +mod message_bus { + use std::sync::mpsc::{channel, Receiver, SendError, Sender}; + + pub struct MsgSender(Sender); + impl MsgSender { + pub fn send(&self, data: Vec) -> Result<(), SendError> { + self.0.send(Msg::Data(data)) + } + } + //pub struct MsgReceiver(Receiver>); + pub type MsgReceiver = Receiver>; + + pub enum Msg { + Data(Vec), + Subscribe(Sender>), + } + + pub struct MessageBus { + input: Sender, + //outputs: Vec>, + } + + fn output_loop(main_rx: Receiver) { + let mut outputs: Vec>> = vec![]; + loop { + match main_rx.recv() { + Ok(Msg::Data(data)) => { + let mut i = 0; + while i < outputs.len() { + if outputs[i].send(data.clone()).is_err() { + outputs.swap_remove(i); + } else { + i += 1; + } + } + } + Ok(Msg::Subscribe(sender)) => { + outputs.push(sender); + } + Err(_e) => { + println!("Error bus died") + } + } + } + } + impl MessageBus { + pub fn new() -> Self { + let (input, input_rx) = channel(); + + std::thread::spawn(move || output_loop(input_rx)); + Self { input, /*outputs*/ } + } + + pub fn subscribe(&self) -> (MsgSender, Receiver>) { + let (tx, rx) = channel(); + let input = self.input.clone(); + + input.send(Msg::Subscribe(tx)).unwrap(); + (MsgSender(input), rx) + } + } +} + +fn main() -> std::io::Result<()> { + let bus = message_bus::MessageBus::new(); + + /* + let (tx1, rx1) = bus.subscribe(); + let (tx2, rx2) = bus.subscribe(); + + let payload = "Hello world!".to_string().as_bytes().to_vec(); + + tx1.send(payload.clone()).unwrap(); + assert_eq!(rx2.recv().unwrap(), rx1.recv().unwrap()); + + println!("sleeping 5"); + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("dropping rx2 and sleeping 5"); + drop(rx2); + drop(tx2); + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("sending payload and sleeping 5"); + tx1.send(payload.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("creating new rx2 and sleeping 5"); + let (tx2, rx2) = bus.subscribe(); + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("sending payload and sleeping 5"); + tx2.send(payload.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_secs(5)); + */ + + let listener = TcpListener::bind("127.0.0.1:8001")?; + for stream in listener.incoming() { + match stream { + Ok(tx_tcp) => { + let (tx_bus, rx_bus) = bus.subscribe(); + let rx_tcp = tx_tcp.try_clone().unwrap(); + + std::thread::spawn(move || publish_thread(rx_tcp, tx_bus)); + + std::thread::spawn(move || consume_thread(tx_tcp, rx_bus)); + } + Err(e) => { + println!("Error accepting connection: {}", e); + } + } + } + + Ok(()) +} + +fn publish_thread(mut rx_tcp: TcpStream, tx_bus: message_bus::MsgSender) { + let mut buffer = vec![0_u8; 2048]; + while let Ok(len) = rx_tcp.read(&mut buffer) { + let data = buffer[..len].to_vec(); + if tx_bus.send(data).is_err() { + break; + } + } +} + +fn consume_thread(mut tx_tcp: TcpStream, rx_bus: message_bus::MsgReceiver) { + while let Ok(data) = rx_bus.recv() { + if tx_tcp.write_all(&data).is_err() { + break; + } + } +}