Switch to async implementation
This commit is contained in:
153
src/main.rs
153
src/main.rs
@@ -1,134 +1,37 @@
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::net::TcpListener;
|
||||
use std::net::TcpStream;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
mod message_bus {
|
||||
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let (tx, _rx) = broadcast::channel(128);
|
||||
let listener = TcpListener::bind("127.0.0.1:8001").await?;
|
||||
loop {
|
||||
let (stream, addr) = listener.accept().await?;
|
||||
println!("{addr} Connected");
|
||||
let (mut rx_tcp, mut tx_tcp) = stream.into_split();
|
||||
let tx_bus = tx.clone();
|
||||
let mut rx_bus = tx_bus.subscribe();
|
||||
|
||||
pub struct MsgSender(Sender<Msg>);
|
||||
impl MsgSender {
|
||||
pub fn send(&self, data: Vec<u8>) -> Result<(), SendError<Msg>> {
|
||||
self.0.send(Msg::Data(data))
|
||||
}
|
||||
}
|
||||
//pub struct MsgReceiver(Receiver<Vec<u8>>);
|
||||
pub type MsgReceiver = Receiver<Vec<u8>>;
|
||||
|
||||
pub enum Msg {
|
||||
Data(Vec<u8>),
|
||||
Subscribe(Sender<Vec<u8>>),
|
||||
}
|
||||
|
||||
pub struct MessageBus {
|
||||
input: Sender<Msg>,
|
||||
//outputs: Vec<Sender<Msg>>,
|
||||
}
|
||||
|
||||
fn output_loop(main_rx: Receiver<Msg>) {
|
||||
let mut outputs: Vec<Sender<Vec<u8>>> = vec![];
|
||||
loop {
|
||||
match main_rx.recv() {
|
||||
Ok(Msg::Data(data)) => {
|
||||
let mut i = 0;
|
||||
while i < outputs.len() {
|
||||
if outputs[i].send(data.clone()).is_err() {
|
||||
outputs.swap_remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Msg::Subscribe(sender)) => {
|
||||
outputs.push(sender);
|
||||
}
|
||||
Err(_e) => {
|
||||
println!("Error bus died")
|
||||
tokio::task::spawn(async move {
|
||||
let mut buffer = vec![0_u8; 2048];
|
||||
while let Ok(len) = rx_tcp.read(&mut buffer).await {
|
||||
let data = buffer[..len].to_vec();
|
||||
if tx_bus.send(data).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl MessageBus {
|
||||
pub fn new() -> Self {
|
||||
let (input, input_rx) = channel();
|
||||
});
|
||||
|
||||
std::thread::spawn(move || output_loop(input_rx));
|
||||
Self { input, /*outputs*/ }
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> (MsgSender, Receiver<Vec<u8>>) {
|
||||
let (tx, rx) = channel();
|
||||
let input = self.input.clone();
|
||||
|
||||
input.send(Msg::Subscribe(tx)).unwrap();
|
||||
(MsgSender(input), rx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> std::io::Result<()> {
|
||||
let bus = message_bus::MessageBus::new();
|
||||
|
||||
/*
|
||||
let (tx1, rx1) = bus.subscribe();
|
||||
let (tx2, rx2) = bus.subscribe();
|
||||
|
||||
let payload = "Hello world!".to_string().as_bytes().to_vec();
|
||||
|
||||
tx1.send(payload.clone()).unwrap();
|
||||
assert_eq!(rx2.recv().unwrap(), rx1.recv().unwrap());
|
||||
|
||||
println!("sleeping 5");
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
println!("dropping rx2 and sleeping 5");
|
||||
drop(rx2);
|
||||
drop(tx2);
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
println!("sending payload and sleeping 5");
|
||||
tx1.send(payload.clone()).unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
println!("creating new rx2 and sleeping 5");
|
||||
let (tx2, rx2) = bus.subscribe();
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
println!("sending payload and sleeping 5");
|
||||
tx2.send(payload.clone()).unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
*/
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:8001")?;
|
||||
for stream in listener.incoming() {
|
||||
match stream {
|
||||
Ok(tx_tcp) => {
|
||||
let (tx_bus, rx_bus) = bus.subscribe();
|
||||
let rx_tcp = tx_tcp.try_clone().unwrap();
|
||||
|
||||
std::thread::spawn(move || publish_thread(rx_tcp, tx_bus));
|
||||
|
||||
std::thread::spawn(move || consume_thread(tx_tcp, rx_bus));
|
||||
tokio::task::spawn(async move {
|
||||
while let Ok(data) = rx_bus.recv().await {
|
||||
let owned_data = data.to_vec();
|
||||
if tx_tcp.write_all(&owned_data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error accepting connection: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn publish_thread(mut rx_tcp: TcpStream, tx_bus: message_bus::MsgSender) {
|
||||
let mut buffer = vec![0_u8; 2048];
|
||||
while let Ok(len) = rx_tcp.read(&mut buffer) {
|
||||
let data = buffer[..len].to_vec();
|
||||
if tx_bus.send(data).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn consume_thread(mut tx_tcp: TcpStream, rx_bus: message_bus::MsgReceiver) {
|
||||
while let Ok(data) = rx_bus.recv() {
|
||||
if tx_tcp.write_all(&data).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
//Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user