bug: terminate threads, fix blocking poll in input (#262)

Bug fix for improper use of threads, where they were not properly terminated (not really too bad) and the input thread code actually blocked.
This commit is contained in:
Clement Tsang 2020-10-02 02:49:45 -04:00 committed by GitHub
parent a5b95ae8b2
commit ba7738e73e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 67 deletions

View File

@ -2,11 +2,13 @@
"cSpell.words": [ "cSpell.words": [
"Artem", "Artem",
"COPR", "COPR",
"Condvar",
"DWORD", "DWORD",
"Deque", "Deque",
"EINVAL", "EINVAL",
"EPERM", "EPERM",
"ESRCH", "ESRCH",
"Fini",
"GIBI", "GIBI",
"GIBIBYTE", "GIBIBYTE",
"GIGA", "GIGA",
@ -42,6 +44,8 @@
"concat", "concat",
"crossterm", "crossterm",
"curr", "curr",
"cvar",
"cvars",
"czvf", "czvf",
"denylist", "denylist",
"fedoracentos", "fedoracentos",

View File

@ -12,6 +12,7 @@ use std::collections::{hash_map::RandomState, HashMap};
use sysinfo::{ProcessExt, ProcessorExt, System, SystemExt}; use sysinfo::{ProcessExt, ProcessorExt, System, SystemExt};
/// Maximum character length of a /proc/<PID>/stat process name. /// Maximum character length of a /proc/<PID>/stat process name.
#[cfg(target_os = "linux")]
const MAX_STAT_NAME_LEN: usize = 15; const MAX_STAT_NAME_LEN: usize = 15;
// TODO: Add value so we know if it's sorted ascending or descending by default? // TODO: Add value so we know if it's sorted ascending or descending by default?

View File

@ -7,12 +7,11 @@ use bottom::{canvas, constants::*, data_conversion::*, options::*, *};
use std::{ use std::{
boxed::Box, boxed::Box,
ffi::OsStr,
io::{stdout, Write}, io::{stdout, Write},
panic, panic,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc, Arc, mpsc, Arc, Condvar, Mutex,
}, },
thread, thread,
time::Duration, time::Duration,
@ -36,7 +35,10 @@ fn main() -> Result<()> {
} else { } else {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
utils::logging::init_logger(log::LevelFilter::Debug, OsStr::new("debug.log"))?; utils::logging::init_logger(
log::LevelFilter::Debug,
std::ffi::OsStr::new("debug.log"),
)?;
} }
} }
@ -70,32 +72,53 @@ fn main() -> Result<()> {
&config, &config,
)?; )?;
// Create termination mutex and cvar
#[allow(clippy::mutex_atomic)]
let thread_termination_lock = Arc::new(Mutex::new(false));
let thread_termination_cvar = Arc::new(Condvar::new());
// Set up input handling // Set up input handling
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
create_input_thread(sender.clone()); let input_thread = create_input_thread(sender.clone(), thread_termination_lock.clone());
// Cleaning loop // Cleaning loop
{ let cleaning_thread = {
let lock = thread_termination_lock.clone();
let cvar = thread_termination_cvar.clone();
let cleaning_sender = sender.clone(); let cleaning_sender = sender.clone();
trace!("Initializing cleaning thread..."); trace!("Initializing cleaning thread...");
thread::spawn(move || loop { thread::spawn(move || {
thread::sleep(Duration::from_millis( loop {
constants::STALE_MAX_MILLISECONDS + 5000, let result = cvar.wait_timeout(
)); lock.lock().unwrap(),
trace!("Sending cleaning signal..."); Duration::from_millis(constants::STALE_MAX_MILLISECONDS + 5000),
if cleaning_sender.send(BottomEvent::Clean).is_err() { );
trace!("Failed to send cleaning signal. Halting cleaning thread loop."); if let Ok(result) = result {
break; if *(result.0) {
trace!("Received termination lock in cleaning thread from cvar!");
break;
}
} else {
trace!("Sending cleaning signal...");
if cleaning_sender.send(BottomEvent::Clean).is_err() {
trace!("Failed to send cleaning signal. Halting cleaning thread loop.");
break;
}
trace!("Cleaning signal sent without errors.");
}
} }
trace!("Cleaning signal sent without errors.");
}); trace!("Cleaning thread loop has closed.");
} })
};
// Event loop // Event loop
let (reset_sender, reset_receiver) = mpsc::channel(); let (collection_thread_ctrl_sender, collection_thread_ctrl_receiver) = mpsc::channel();
create_collection_thread( let collection_thread = create_collection_thread(
sender, sender,
reset_receiver, collection_thread_ctrl_receiver,
thread_termination_lock.clone(),
thread_termination_cvar.clone(),
&app.app_config_fields, &app.app_config_fields,
app.used_widgets.clone(), app.used_widgets.clone(),
); );
@ -117,7 +140,6 @@ fn main() -> Result<()> {
let ist_clone = is_terminated.clone(); let ist_clone = is_terminated.clone();
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
ist_clone.store(true, Ordering::SeqCst); ist_clone.store(true, Ordering::SeqCst);
termination_hook();
})?; })?;
let mut first_run = true; let mut first_run = true;
@ -127,12 +149,12 @@ fn main() -> Result<()> {
if let BottomEvent::Update(_) = recv { if let BottomEvent::Update(_) = recv {
trace!("Main/drawing thread received Update event."); trace!("Main/drawing thread received Update event.");
} else { } else {
trace!("Main/drawing thread received event: {:#?}", recv); trace!("Main/drawing thread received event: {:?}", recv);
} }
} }
match recv { match recv {
BottomEvent::KeyInput(event) => { BottomEvent::KeyInput(event) => {
if handle_key_event_or_break(event, &mut app, &reset_sender) { if handle_key_event_or_break(event, &mut app, &collection_thread_ctrl_sender) {
break; break;
} }
handle_force_redraws(&mut app); handle_force_redraws(&mut app);
@ -227,7 +249,19 @@ fn main() -> Result<()> {
try_drawing(&mut terminal, &mut app, &mut painter, is_debug)?; try_drawing(&mut terminal, &mut app, &mut painter, is_debug)?;
} }
trace!("Main/drawing thread is cleaning up."); // I think doing it in this order is safe...
trace!("Send termination thread locks.");
*thread_termination_lock.lock().unwrap() = true;
trace!("Notifying all cvars.");
thread_termination_cvar.notify_all();
cleanup_terminal(&mut terminal, is_debug)?; cleanup_terminal(&mut terminal, is_debug)?;
trace!("Main/drawing thread is cleaning up.");
cleaning_thread.join().unwrap();
input_thread.join().unwrap();
collection_thread.join().unwrap();
trace!("Fini.");
Ok(()) Ok(())
} }

View File

@ -9,6 +9,9 @@ use std::{
io::{stdout, Write}, io::{stdout, Write},
panic::PanicInfo, panic::PanicInfo,
path::PathBuf, path::PathBuf,
sync::Arc,
sync::Condvar,
sync::Mutex,
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -57,7 +60,7 @@ pub enum BottomEvent<I, J> {
} }
#[derive(Debug)] #[derive(Debug)]
pub enum CollectionThreadEvent { pub enum ThreadControlEvent {
Reset, Reset,
UpdateConfig(Box<app::AppConfigFields>), UpdateConfig(Box<app::AppConfigFields>),
UpdateUsedWidgets(Box<UsedWidgets>), UpdateUsedWidgets(Box<UsedWidgets>),
@ -87,7 +90,7 @@ pub fn handle_mouse_event(event: MouseEvent, app: &mut App) {
} }
pub fn handle_key_event_or_break( pub fn handle_key_event_or_break(
event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender<CollectionThreadEvent>, event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender<ThreadControlEvent>,
) -> bool { ) -> bool {
// debug!("KeyEvent: {:?}", event); // debug!("KeyEvent: {:?}", event);
@ -144,7 +147,7 @@ pub fn handle_key_event_or_break(
KeyCode::Up => app.move_widget_selection(&WidgetDirection::Up), KeyCode::Up => app.move_widget_selection(&WidgetDirection::Up),
KeyCode::Down => app.move_widget_selection(&WidgetDirection::Down), KeyCode::Down => app.move_widget_selection(&WidgetDirection::Down),
KeyCode::Char('r') => { KeyCode::Char('r') => {
if reset_sender.send(CollectionThreadEvent::Reset).is_ok() { if reset_sender.send(ThreadControlEvent::Reset).is_ok() {
app.reset(); app.reset();
} }
} }
@ -262,12 +265,6 @@ pub fn cleanup_terminal(
Ok(()) Ok(())
} }
pub fn termination_hook() {
let mut stdout = stdout();
disable_raw_mode().unwrap();
execute!(stdout, DisableMouseCapture, LeaveAlternateScreen).unwrap();
}
/// Based on https://github.com/Rigellute/spotify-tui/blob/master/src/main.rs /// Based on https://github.com/Rigellute/spotify-tui/blob/master/src/main.rs
pub fn panic_hook(panic_info: &PanicInfo<'_>) { pub fn panic_hook(panic_info: &PanicInfo<'_>) {
let mut stdout = stdout(); let mut stdout = stdout();
@ -564,7 +561,8 @@ pub fn create_input_thread(
sender: std::sync::mpsc::Sender< sender: std::sync::mpsc::Sender<
BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>, BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>,
>, >,
) { termination_ctrl_lock: Arc<Mutex<bool>>,
) -> std::thread::JoinHandle<()> {
trace!("Creating input thread."); trace!("Creating input thread.");
thread::spawn(move || { thread::spawn(move || {
trace!("Spawned input thread."); trace!("Spawned input thread.");
@ -572,40 +570,51 @@ pub fn create_input_thread(
let mut keyboard_timer = Instant::now(); let mut keyboard_timer = Instant::now();
loop { loop {
trace!("Waiting for an input event..."); if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
if poll(Duration::from_millis(20)).is_ok() { // We don't block.
if let Ok(event) = read() { if *is_terminated {
trace!("Input thread received an event: {:?}", event); trace!("Received termination lock in input thread!");
if let Event::Key(key) = event { drop(is_terminated);
if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 { break;
if sender.send(BottomEvent::KeyInput(key)).is_err() { }
break; }
if let Ok(poll) = poll(Duration::from_millis(20)) {
if poll {
if let Ok(event) = read() {
trace!("Input thread received an event: {:?}", event);
if let Event::Key(key) = event {
if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 {
if sender.send(BottomEvent::KeyInput(key)).is_err() {
break;
}
trace!("Input thread sent keyboard data.");
keyboard_timer = Instant::now();
} }
trace!("Input thread sent data."); } else if let Event::Mouse(mouse) = event {
keyboard_timer = Instant::now(); if Instant::now().duration_since(mouse_timer).as_millis() >= 20 {
} if sender.send(BottomEvent::MouseInput(mouse)).is_err() {
} else if let Event::Mouse(mouse) = event { break;
if Instant::now().duration_since(mouse_timer).as_millis() >= 20 { }
if sender.send(BottomEvent::MouseInput(mouse)).is_err() { trace!("Input thread sent mouse data.");
break; mouse_timer = Instant::now();
} }
trace!("Input thread sent data.");
mouse_timer = Instant::now();
} }
} }
} }
} }
} }
}); trace!("Input thread loop has closed.");
})
} }
pub fn create_collection_thread( pub fn create_collection_thread(
sender: std::sync::mpsc::Sender< sender: std::sync::mpsc::Sender<
BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>, BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>,
>, >,
reset_receiver: std::sync::mpsc::Receiver<CollectionThreadEvent>, control_receiver: std::sync::mpsc::Receiver<ThreadControlEvent>,
termination_ctrl_lock: Arc<Mutex<bool>>, termination_ctrl_cvar: Arc<Condvar>,
app_config_fields: &app::AppConfigFields, used_widget_set: UsedWidgets, app_config_fields: &app::AppConfigFields, used_widget_set: UsedWidgets,
) { ) -> std::thread::JoinHandle<()> {
trace!("Creating collection thread."); trace!("Creating collection thread.");
let temp_type = app_config_fields.temperature_type.clone(); let temp_type = app_config_fields.temperature_type.clone();
let use_current_cpu_total = app_config_fields.use_current_cpu_total; let use_current_cpu_total = app_config_fields.use_current_cpu_total;
@ -617,50 +626,77 @@ pub fn create_collection_thread(
let mut data_state = data_harvester::DataCollector::default(); let mut data_state = data_harvester::DataCollector::default();
trace!("Created initial data state."); trace!("Created initial data state.");
data_state.set_collected_data(used_widget_set); data_state.set_collected_data(used_widget_set);
trace!("Set collected data.");
data_state.set_temperature_type(temp_type); data_state.set_temperature_type(temp_type);
trace!("Set initial temp type.");
data_state.set_use_current_cpu_total(use_current_cpu_total); data_state.set_use_current_cpu_total(use_current_cpu_total);
trace!("Set current CPU total.");
data_state.set_show_average_cpu(show_average_cpu); data_state.set_show_average_cpu(show_average_cpu);
trace!("Set showing average CPU.");
data_state.init(); data_state.init();
trace!("Data state is now fully initialized."); trace!("Data state is now fully initialized.");
loop { loop {
trace!("Collecting..."); // Check once at the very top...
if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
// We don't block here.
if *is_terminated {
trace!("Received termination lock in collection thread!");
drop(is_terminated);
break;
}
}
trace!("Checking for collection control receiver event...");
let mut update_time = update_rate_in_milliseconds; let mut update_time = update_rate_in_milliseconds;
if let Ok(message) = reset_receiver.try_recv() { if let Ok(message) = control_receiver.try_recv() {
trace!("Received message in collection thread: {:?}", message); trace!("Received message in collection thread: {:?}", message);
match message { match message {
CollectionThreadEvent::Reset => { ThreadControlEvent::Reset => {
data_state.data.cleanup(); data_state.data.cleanup();
} }
CollectionThreadEvent::UpdateConfig(app_config_fields) => { ThreadControlEvent::UpdateConfig(app_config_fields) => {
data_state.set_temperature_type(app_config_fields.temperature_type.clone()); data_state.set_temperature_type(app_config_fields.temperature_type.clone());
data_state data_state
.set_use_current_cpu_total(app_config_fields.use_current_cpu_total); .set_use_current_cpu_total(app_config_fields.use_current_cpu_total);
data_state.set_show_average_cpu(app_config_fields.show_average_cpu); data_state.set_show_average_cpu(app_config_fields.show_average_cpu);
} }
CollectionThreadEvent::UpdateUsedWidgets(used_widget_set) => { ThreadControlEvent::UpdateUsedWidgets(used_widget_set) => {
data_state.set_collected_data(*used_widget_set); data_state.set_collected_data(*used_widget_set);
} }
CollectionThreadEvent::UpdateUpdateTime(new_time) => { ThreadControlEvent::UpdateUpdateTime(new_time) => {
update_time = new_time; update_time = new_time;
} }
} }
} }
futures::executor::block_on(data_state.update_data()); futures::executor::block_on(data_state.update_data());
trace!("Collection thread is updating...");
// Yet another check to bail if needed...
if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
// We don't block here.
if *is_terminated {
trace!("Received termination lock in collection thread!");
drop(is_terminated);
break;
}
}
trace!("Collection thread is updating and sending...");
let event = BottomEvent::Update(Box::from(data_state.data)); let event = BottomEvent::Update(Box::from(data_state.data));
trace!("Collection thread done updating. Sending data now...");
data_state.data = data_harvester::Data::default(); data_state.data = data_harvester::Data::default();
if sender.send(event).is_err() { if sender.send(event).is_err() {
trace!("Error sending from collection thread..."); trace!("Error sending from collection thread...");
break; break;
} }
trace!("No problem sending from collection thread!"); trace!("No problem sending from collection thread!");
thread::sleep(Duration::from_millis(update_time));
if let Ok((is_terminated, _wait_timeout_result)) = termination_ctrl_cvar.wait_timeout(
termination_ctrl_lock.lock().unwrap(),
Duration::from_millis(update_time),
) {
if *is_terminated {
trace!("Received termination lock in collection thread from cvar!");
drop(is_terminated);
break;
}
}
} }
}); trace!("Collection thread loop has closed.");
})
} }