From 8513bafad611288cdcbb194738bd3e50e05ebee7 Mon Sep 17 00:00:00 2001 From: Lucas Schumacher Date: Sat, 30 Dec 2023 11:15:44 -0500 Subject: [PATCH] Better error messages --- .gitignore | 2 ++ src/main.rs | 29 +++++++++++++++++++++-------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..40082fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +perf.data +perf.data.old diff --git a/src/main.rs b/src/main.rs index 3141102..66dc4ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,12 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; -use tokio::sync::broadcast; +use tokio::sync::broadcast::{self, error::RecvError::Lagged}; #[tokio::main] async fn main() -> std::io::Result<()> { - let (tx, _rx) = broadcast::channel(128); + let (tx, _rx) = broadcast::channel(256); let listener = TcpListener::bind("127.0.0.1:8001").await?; + loop { let (stream, addr) = listener.accept().await?; println!("{addr} Connected"); @@ -21,20 +22,32 @@ async fn main() -> std::io::Result<()> { } let data = buffer[..len].to_vec(); if tx_bus.send(data).is_err() { + println!("{} tcp reader bus error", &addr); break; } } + println!("{} tcp reader exit", &addr); }); tokio::task::spawn(async move { - while let Ok(data) = rx_bus.recv().await { - let owned_data = data.to_vec(); - if tx_tcp.write_all(&owned_data).await.is_err() { - break; + loop { + match rx_bus.recv().await { + Ok(data) => { + if let Err(e) = tx_tcp.write_all(&data).await { + println!("{} tcp writer error {}", &addr, e); + break; + } + } + Err(Lagged(i)) => { + eprintln!("Warning: {} dropped {} packets", &addr, i); + } + Err(e) => { + eprintln!("rx bus error: {}", e); + break; + } } } + println!("{} tcp writer exit", &addr); }); } - - //Ok(()) }