From ba7738e73ec46548210b96ddfbf52a2773c31168 Mon Sep 17 00:00:00 2001 From: Clement Tsang <34804052+ClementTsang@users.noreply.github.com> Date: Fri, 2 Oct 2020 02:49:45 -0400 Subject: [PATCH] bug: terminate threads, fix blocking poll in input (#262) Bug fix for improper use of threads, where they were not properly terminated (not really too bad) and the input thread code actually blocked. --- .vscode/settings.json | 4 + src/app/data_harvester/processes.rs | 1 + src/bin/main.rs | 80 ++++++++++++------ src/lib.rs | 124 ++++++++++++++++++---------- 4 files changed, 142 insertions(+), 67 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index da6cd6b9..dd8c5e7a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,11 +2,13 @@ "cSpell.words": [ "Artem", "COPR", + "Condvar", "DWORD", "Deque", "EINVAL", "EPERM", "ESRCH", + "Fini", "GIBI", "GIBIBYTE", "GIGA", @@ -42,6 +44,8 @@ "concat", "crossterm", "curr", + "cvar", + "cvars", "czvf", "denylist", "fedoracentos", diff --git a/src/app/data_harvester/processes.rs b/src/app/data_harvester/processes.rs index 62789146..ce104082 100644 --- a/src/app/data_harvester/processes.rs +++ b/src/app/data_harvester/processes.rs @@ -12,6 +12,7 @@ use std::collections::{hash_map::RandomState, HashMap}; use sysinfo::{ProcessExt, ProcessorExt, System, SystemExt}; /// Maximum character length of a /proc//stat process name. +#[cfg(target_os = "linux")] const MAX_STAT_NAME_LEN: usize = 15; // TODO: Add value so we know if it's sorted ascending or descending by default? diff --git a/src/bin/main.rs b/src/bin/main.rs index 50881529..1df99ab4 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -7,12 +7,11 @@ use bottom::{canvas, constants::*, data_conversion::*, options::*, *}; use std::{ boxed::Box, - ffi::OsStr, io::{stdout, Write}, panic, sync::{ atomic::{AtomicBool, Ordering}, - mpsc, Arc, + mpsc, Arc, Condvar, Mutex, }, thread, time::Duration, @@ -36,7 +35,10 @@ fn main() -> Result<()> { } else { #[cfg(debug_assertions)] { - utils::logging::init_logger(log::LevelFilter::Debug, OsStr::new("debug.log"))?; + utils::logging::init_logger( + log::LevelFilter::Debug, + std::ffi::OsStr::new("debug.log"), + )?; } } @@ -70,32 +72,53 @@ fn main() -> Result<()> { &config, )?; + // Create termination mutex and cvar + #[allow(clippy::mutex_atomic)] + let thread_termination_lock = Arc::new(Mutex::new(false)); + let thread_termination_cvar = Arc::new(Condvar::new()); + // Set up input handling let (sender, receiver) = mpsc::channel(); - create_input_thread(sender.clone()); + let input_thread = create_input_thread(sender.clone(), thread_termination_lock.clone()); // Cleaning loop - { + let cleaning_thread = { + let lock = thread_termination_lock.clone(); + let cvar = thread_termination_cvar.clone(); let cleaning_sender = sender.clone(); trace!("Initializing cleaning thread..."); - thread::spawn(move || loop { - thread::sleep(Duration::from_millis( - constants::STALE_MAX_MILLISECONDS + 5000, - )); - trace!("Sending cleaning signal..."); - if cleaning_sender.send(BottomEvent::Clean).is_err() { - trace!("Failed to send cleaning signal. Halting cleaning thread loop."); - break; + thread::spawn(move || { + loop { + let result = cvar.wait_timeout( + lock.lock().unwrap(), + Duration::from_millis(constants::STALE_MAX_MILLISECONDS + 5000), + ); + if let Ok(result) = result { + if *(result.0) { + trace!("Received termination lock in cleaning thread from cvar!"); + break; + } + } else { + trace!("Sending cleaning signal..."); + if cleaning_sender.send(BottomEvent::Clean).is_err() { + trace!("Failed to send cleaning signal. Halting cleaning thread loop."); + break; + } + trace!("Cleaning signal sent without errors."); + } } - trace!("Cleaning signal sent without errors."); - }); - } + + trace!("Cleaning thread loop has closed."); + }) + }; // Event loop - let (reset_sender, reset_receiver) = mpsc::channel(); - create_collection_thread( + let (collection_thread_ctrl_sender, collection_thread_ctrl_receiver) = mpsc::channel(); + let collection_thread = create_collection_thread( sender, - reset_receiver, + collection_thread_ctrl_receiver, + thread_termination_lock.clone(), + thread_termination_cvar.clone(), &app.app_config_fields, app.used_widgets.clone(), ); @@ -117,7 +140,6 @@ fn main() -> Result<()> { let ist_clone = is_terminated.clone(); ctrlc::set_handler(move || { ist_clone.store(true, Ordering::SeqCst); - termination_hook(); })?; let mut first_run = true; @@ -127,12 +149,12 @@ fn main() -> Result<()> { if let BottomEvent::Update(_) = recv { trace!("Main/drawing thread received Update event."); } else { - trace!("Main/drawing thread received event: {:#?}", recv); + trace!("Main/drawing thread received event: {:?}", recv); } } match recv { BottomEvent::KeyInput(event) => { - if handle_key_event_or_break(event, &mut app, &reset_sender) { + if handle_key_event_or_break(event, &mut app, &collection_thread_ctrl_sender) { break; } handle_force_redraws(&mut app); @@ -227,7 +249,19 @@ fn main() -> Result<()> { try_drawing(&mut terminal, &mut app, &mut painter, is_debug)?; } - trace!("Main/drawing thread is cleaning up."); + // I think doing it in this order is safe... + trace!("Send termination thread locks."); + *thread_termination_lock.lock().unwrap() = true; + trace!("Notifying all cvars."); + thread_termination_cvar.notify_all(); + cleanup_terminal(&mut terminal, is_debug)?; + + trace!("Main/drawing thread is cleaning up."); + + cleaning_thread.join().unwrap(); + input_thread.join().unwrap(); + collection_thread.join().unwrap(); + trace!("Fini."); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 605b79b7..54fdb35b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,9 @@ use std::{ io::{stdout, Write}, panic::PanicInfo, path::PathBuf, + sync::Arc, + sync::Condvar, + sync::Mutex, thread, time::{Duration, Instant}, }; @@ -57,7 +60,7 @@ pub enum BottomEvent { } #[derive(Debug)] -pub enum CollectionThreadEvent { +pub enum ThreadControlEvent { Reset, UpdateConfig(Box), UpdateUsedWidgets(Box), @@ -87,7 +90,7 @@ pub fn handle_mouse_event(event: MouseEvent, app: &mut App) { } pub fn handle_key_event_or_break( - event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender, + event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender, ) -> bool { // debug!("KeyEvent: {:?}", event); @@ -144,7 +147,7 @@ pub fn handle_key_event_or_break( KeyCode::Up => app.move_widget_selection(&WidgetDirection::Up), KeyCode::Down => app.move_widget_selection(&WidgetDirection::Down), KeyCode::Char('r') => { - if reset_sender.send(CollectionThreadEvent::Reset).is_ok() { + if reset_sender.send(ThreadControlEvent::Reset).is_ok() { app.reset(); } } @@ -262,12 +265,6 @@ pub fn cleanup_terminal( Ok(()) } -pub fn termination_hook() { - let mut stdout = stdout(); - disable_raw_mode().unwrap(); - execute!(stdout, DisableMouseCapture, LeaveAlternateScreen).unwrap(); -} - /// Based on https://github.com/Rigellute/spotify-tui/blob/master/src/main.rs pub fn panic_hook(panic_info: &PanicInfo<'_>) { let mut stdout = stdout(); @@ -564,7 +561,8 @@ pub fn create_input_thread( sender: std::sync::mpsc::Sender< BottomEvent, >, -) { + termination_ctrl_lock: Arc>, +) -> std::thread::JoinHandle<()> { trace!("Creating input thread."); thread::spawn(move || { trace!("Spawned input thread."); @@ -572,40 +570,51 @@ pub fn create_input_thread( let mut keyboard_timer = Instant::now(); loop { - trace!("Waiting for an input event..."); - if poll(Duration::from_millis(20)).is_ok() { - if let Ok(event) = read() { - trace!("Input thread received an event: {:?}", event); - if let Event::Key(key) = event { - if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 { - if sender.send(BottomEvent::KeyInput(key)).is_err() { - break; + if let Ok(is_terminated) = termination_ctrl_lock.try_lock() { + // We don't block. + if *is_terminated { + trace!("Received termination lock in input thread!"); + drop(is_terminated); + break; + } + } + if let Ok(poll) = poll(Duration::from_millis(20)) { + if poll { + if let Ok(event) = read() { + trace!("Input thread received an event: {:?}", event); + if let Event::Key(key) = event { + if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 { + if sender.send(BottomEvent::KeyInput(key)).is_err() { + break; + } + trace!("Input thread sent keyboard data."); + keyboard_timer = Instant::now(); } - trace!("Input thread sent data."); - keyboard_timer = Instant::now(); - } - } else if let Event::Mouse(mouse) = event { - if Instant::now().duration_since(mouse_timer).as_millis() >= 20 { - if sender.send(BottomEvent::MouseInput(mouse)).is_err() { - break; + } else if let Event::Mouse(mouse) = event { + if Instant::now().duration_since(mouse_timer).as_millis() >= 20 { + if sender.send(BottomEvent::MouseInput(mouse)).is_err() { + break; + } + trace!("Input thread sent mouse data."); + mouse_timer = Instant::now(); } - trace!("Input thread sent data."); - mouse_timer = Instant::now(); } } } } } - }); + trace!("Input thread loop has closed."); + }) } pub fn create_collection_thread( sender: std::sync::mpsc::Sender< BottomEvent, >, - reset_receiver: std::sync::mpsc::Receiver, + control_receiver: std::sync::mpsc::Receiver, + termination_ctrl_lock: Arc>, termination_ctrl_cvar: Arc, app_config_fields: &app::AppConfigFields, used_widget_set: UsedWidgets, -) { +) -> std::thread::JoinHandle<()> { trace!("Creating collection thread."); let temp_type = app_config_fields.temperature_type.clone(); let use_current_cpu_total = app_config_fields.use_current_cpu_total; @@ -617,50 +626,77 @@ pub fn create_collection_thread( let mut data_state = data_harvester::DataCollector::default(); trace!("Created initial data state."); data_state.set_collected_data(used_widget_set); - trace!("Set collected data."); data_state.set_temperature_type(temp_type); - trace!("Set initial temp type."); data_state.set_use_current_cpu_total(use_current_cpu_total); - trace!("Set current CPU total."); data_state.set_show_average_cpu(show_average_cpu); - trace!("Set showing average CPU."); data_state.init(); trace!("Data state is now fully initialized."); loop { - trace!("Collecting..."); + // Check once at the very top... + if let Ok(is_terminated) = termination_ctrl_lock.try_lock() { + // We don't block here. + if *is_terminated { + trace!("Received termination lock in collection thread!"); + drop(is_terminated); + break; + } + } + + trace!("Checking for collection control receiver event..."); let mut update_time = update_rate_in_milliseconds; - if let Ok(message) = reset_receiver.try_recv() { + if let Ok(message) = control_receiver.try_recv() { trace!("Received message in collection thread: {:?}", message); match message { - CollectionThreadEvent::Reset => { + ThreadControlEvent::Reset => { data_state.data.cleanup(); } - CollectionThreadEvent::UpdateConfig(app_config_fields) => { + ThreadControlEvent::UpdateConfig(app_config_fields) => { data_state.set_temperature_type(app_config_fields.temperature_type.clone()); data_state .set_use_current_cpu_total(app_config_fields.use_current_cpu_total); data_state.set_show_average_cpu(app_config_fields.show_average_cpu); } - CollectionThreadEvent::UpdateUsedWidgets(used_widget_set) => { + ThreadControlEvent::UpdateUsedWidgets(used_widget_set) => { data_state.set_collected_data(*used_widget_set); } - CollectionThreadEvent::UpdateUpdateTime(new_time) => { + ThreadControlEvent::UpdateUpdateTime(new_time) => { update_time = new_time; } } } futures::executor::block_on(data_state.update_data()); - trace!("Collection thread is updating..."); + + // Yet another check to bail if needed... + if let Ok(is_terminated) = termination_ctrl_lock.try_lock() { + // We don't block here. + if *is_terminated { + trace!("Received termination lock in collection thread!"); + drop(is_terminated); + break; + } + } + + trace!("Collection thread is updating and sending..."); let event = BottomEvent::Update(Box::from(data_state.data)); - trace!("Collection thread done updating. Sending data now..."); data_state.data = data_harvester::Data::default(); if sender.send(event).is_err() { trace!("Error sending from collection thread..."); break; } trace!("No problem sending from collection thread!"); - thread::sleep(Duration::from_millis(update_time)); + + if let Ok((is_terminated, _wait_timeout_result)) = termination_ctrl_cvar.wait_timeout( + termination_ctrl_lock.lock().unwrap(), + Duration::from_millis(update_time), + ) { + if *is_terminated { + trace!("Received termination lock in collection thread from cvar!"); + drop(is_terminated); + break; + } + } } - }); + trace!("Collection thread loop has closed."); + }) }