From 4724d23459a3471dd0c72442cbcc54d7c717ba45 Mon Sep 17 00:00:00 2001 From: Lucas Schumacher Date: Wed, 12 Jun 2024 13:07:49 -0400 Subject: [PATCH] Fix memory leak caused by unbounded channels not freeing memory --- .gitignore | 1 + src/app/debug_plot.rs | 31 +++++++++++++++++++++++++------ src/app/fft.rs | 27 +++++++++++++++------------ src/backend/audio.rs | 20 +++++++++++++------- src/backend/mod.rs | 8 ++++---- 5 files changed, 58 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index fbb8e77..dac8998 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target .DS_Store dist +heaptrack.*.zst diff --git a/src/app/debug_plot.rs b/src/app/debug_plot.rs index eff2344..9ade8f2 100644 --- a/src/app/debug_plot.rs +++ b/src/app/debug_plot.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::mpsc::{self, Sender}; +use std::sync::mpsc; use egui::{Context, Ui}; use egui_plot::{Line, Plot, PlotBounds, PlotPoints}; @@ -10,25 +10,44 @@ pub enum PlotData { //F32(Vec), Bode32(Vec), } - +#[derive(Clone)] +pub struct DebugPlotSender { + tx: mpsc::SyncSender<(&'static str, PlotData)>, +} +impl DebugPlotSender { + pub fn send( + &self, + plot_name: &'static str, + plot_data: PlotData, + ) -> Result<(), mpsc::SendError> { + match self.tx.try_send((plot_name, plot_data)) { + Err(mpsc::TrySendError::Full(_)) => { + log::warn!("Debug buffer is full!"); + Ok(()) + } + Err(mpsc::TrySendError::Disconnected((_, d))) => Err(mpsc::SendError(d)), + Ok(()) => Ok(()), + } + } +} pub struct DebugPlots { plots: HashMap<&'static str, PlotData>, plot_en: HashMap<&'static str, bool>, rx: mpsc::Receiver<(&'static str, PlotData)>, - tx: mpsc::Sender<(&'static str, PlotData)>, + tx: DebugPlotSender, } impl DebugPlots { pub fn new() -> Self { - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::sync_channel(128); DebugPlots { plots: HashMap::new(), plot_en: HashMap::new(), rx, - tx, + tx: DebugPlotSender { tx }, } } - pub fn get_sender(&self) -> Sender<(&'static str, PlotData)> { + pub fn get_sender(&self) -> DebugPlotSender { self.tx.clone() } pub fn update_plots(&mut self) { diff --git a/src/app/fft.rs b/src/app/fft.rs index 0648521..c3dc79b 100644 --- a/src/app/fft.rs +++ b/src/app/fft.rs @@ -1,24 +1,21 @@ use anyhow::{anyhow, Result}; use realfft::RealFftPlanner; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError}; -use super::debug_plot::PlotData; +use super::debug_plot::{DebugPlotSender, PlotData}; pub struct Fft { - pub tx: Sender>, + pub tx: SyncSender>, pub output_len: usize, } impl Fft { - pub fn new( - size: usize, - plot_tx: Sender<(&'static str, PlotData)>, - ) -> Result<(Self, mpsc::Receiver>)> { + pub fn new(size: usize, plot_tx: DebugPlotSender) -> Result<(Self, mpsc::Receiver>)> { let output_len = size / 2 + 1; // Create mpsc queue - let (tx, rx) = mpsc::channel(); - let (in_tx, in_rx): (Sender>, Receiver>) = mpsc::channel(); + let (tx, rx) = mpsc::sync_channel(10); + let (in_tx, in_rx): (SyncSender>, Receiver>) = mpsc::sync_channel(10); // Setup fft use f32 for now let mut fft_planner = RealFftPlanner::::new(); @@ -36,7 +33,7 @@ impl Fft { fft.process_with_scratch(&mut fft_in, &mut fft_out, &mut fft_scratch) .unwrap(); plot_tx - .send(("FFT Output", PlotData::Bode32(fft_out.clone()))) + .send("FFT Output", PlotData::Bode32(fft_out.clone())) .unwrap(); fft_in.clear(); let output: Vec = fft_out @@ -48,9 +45,15 @@ impl Fft { .collect(); assert_eq!(output_len, output.len()); plot_tx - .send(("FFT Processed Output", PlotData::U8(output.clone()))) + .send("FFT Processed Output", PlotData::U8(output.clone())) .unwrap(); - tx.send(output).unwrap(); + match tx.try_send(output) { + Ok(_) => {} + Err(TrySendError::Full(_)) => log::warn!("Waterfall buffer full."), + Err(TrySendError::Disconnected(_)) => { + panic!("The fft thread has disconnected from the waterfall!") + } + } } } }); diff --git a/src/backend/audio.rs b/src/backend/audio.rs index a5192e6..9dda17b 100644 --- a/src/backend/audio.rs +++ b/src/backend/audio.rs @@ -4,9 +4,9 @@ use cpal::{ traits::{DeviceTrait, HostTrait}, BufferSize, }; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{SyncSender, TrySendError}; -use crate::app::debug_plot::PlotData; +use crate::app::debug_plot::DebugPlotSender; pub struct Audio { pub stream: cpal::Stream, @@ -15,13 +15,19 @@ impl Audio { pub fn new( device: &cpal::Device, config: cpal::StreamConfig, - fft_input: Sender>, - _plot_tx: Sender<(&'static str, PlotData)>, + fft_input: SyncSender>, + _plot_tx: DebugPlotSender, ) -> Result { let stream = device.build_input_stream( &config, move |data: &[f32], _: &cpal::InputCallbackInfo| { - fft_input.send(data.to_vec()).unwrap(); + match fft_input.try_send(data.to_vec()) { + Err(TrySendError::Disconnected(_)) => panic!( + "Error: Audio backend has lost connection to frontend! Can not continue!" + ), + Err(TrySendError::Full(_)) => log::warn!("Audio Backend buffer full."), + Ok(()) => {} + }; }, move |err| log::error!("Audio Thread Error: {err}"), None, @@ -88,8 +94,8 @@ impl super::Backend for AudioBackend { fn build_device( &mut self, - fft_input: Sender>, - _plot_tx: Sender<(&'static str, PlotData)>, + fft_input: SyncSender>, + _plot_tx: DebugPlotSender, ) -> anyhow::Result> { let config = cpal::StreamConfig { channels: 1, diff --git a/src/backend/mod.rs b/src/backend/mod.rs index d4d1f66..9ee766c 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -1,8 +1,8 @@ -use std::sync::mpsc::Sender; +use std::sync::mpsc::SyncSender; use egui::Ui; -use crate::app::debug_plot::PlotData; +use crate::app::debug_plot::DebugPlotSender; mod audio; pub trait Device { fn show_settings(&mut self, ui: &mut Ui); @@ -14,8 +14,8 @@ pub trait Backend { fn show_device_selection(&mut self, ui: &mut Ui); fn build_device( &mut self, - fft_input: Sender>, - _plot_tx: Sender<(&'static str, PlotData)>, + fft_input: SyncSender>, + _plot_tx: DebugPlotSender, ) -> anyhow::Result>; } pub struct Backends(pub Vec>);