Merge pull request #8 from ClementTsang/multithread_collection

Multithread collection
This commit is contained in:
Clement Tsang 2020-02-15 16:29:23 -05:00 committed by GitHub
commit a7594efbe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 72 deletions

View File

@ -23,6 +23,7 @@ lto = true
[dependencies] [dependencies]
chrono = "0.4.10" chrono = "0.4.10"
clap = "2.33.0" clap = "2.33.0"
crossbeam = "0.7.3"
fern = "0.5.9" fern = "0.5.9"
futures-timer = "3.0.1" futures-timer = "3.0.1"
futures = "0.3.4" futures = "0.3.4"

View File

@ -114,48 +114,8 @@ impl DataState {
let current_instant = std::time::Instant::now(); let current_instant = std::time::Instant::now();
// TODO: [OPT] MT/Async the harvesting step. debug!("Start....");
// Network
self.data.network = network::get_network_data(
&self.sys,
self.last_collection_time,
&mut self.total_rx,
&mut self.total_tx,
current_instant,
)
.await;
self.total_rx = self.data.network.total_rx;
self.total_tx = self.data.network.total_tx;
// Mem and swap
if let Ok(memory) = mem::get_mem_data_list().await {
self.data.memory = memory;
}
if let Ok(swap) = mem::get_swap_data_list().await {
self.data.swap = swap;
}
// CPU
self.data.cpu = cpu::get_cpu_data_list(&self.sys); self.data.cpu = cpu::get_cpu_data_list(&self.sys);
// Disks
if let Ok(disks) = disks::get_disk_usage_list().await {
self.data.disks = disks;
}
if let Ok(io) = disks::get_io_usage_list(false).await {
self.data.io = io;
}
// Temp
if let Ok(temp) = temperature::get_temperature_data(&self.sys, &self.temperature_type).await
{
self.data.temperature_sensors = temp;
}
// What we want to do: For timed data, if there is an error, just do not add. For other data, just don't update!
if let Ok(process_list) = processes::get_sorted_processes_list( if let Ok(process_list) = processes::get_sorted_processes_list(
&self.sys, &self.sys,
&mut self.prev_idle, &mut self.prev_idle,
@ -168,6 +128,55 @@ impl DataState {
self.data.list_of_processes = process_list; self.data.list_of_processes = process_list;
} }
// ASYNC
let network_data_fut = network::get_network_data(
&self.sys,
self.last_collection_time,
&mut self.total_rx,
&mut self.total_tx,
current_instant,
);
let mem_data_fut = mem::get_mem_data_list();
let swap_data_fut = mem::get_swap_data_list();
let disk_data_fut = disks::get_disk_usage_list();
let disk_io_usage_fut = disks::get_io_usage_list(false);
let temp_data_fut = temperature::get_temperature_data(&self.sys, &self.temperature_type);
let (net_data, mem_res, swap_res, disk_res, io_res, temp_res) = join!(
network_data_fut,
mem_data_fut,
swap_data_fut,
disk_data_fut,
disk_io_usage_fut,
temp_data_fut
);
// After async
self.data.network = net_data;
self.total_rx = self.data.network.total_rx;
self.total_tx = self.data.network.total_tx;
if let Ok(memory) = mem_res {
self.data.memory = memory;
}
if let Ok(swap) = swap_res {
self.data.swap = swap;
}
if let Ok(disks) = disk_res {
self.data.disks = disks;
}
if let Ok(io) = io_res {
self.data.io = io;
}
if let Ok(temp) = temp_res {
self.data.temperature_sensors = temp;
}
debug!("End....");
// Update time // Update time
self.data.last_collection_time = current_instant; self.data.last_collection_time = current_instant;
self.last_collection_time = current_instant; self.last_collection_time = current_instant;

View File

@ -116,10 +116,9 @@ fn get_process_cpu_stats(pid: u32) -> std::io::Result<f64> {
/// Note that cpu_fraction should be represented WITHOUT the \times 100 factor! /// Note that cpu_fraction should be represented WITHOUT the \times 100 factor!
fn linux_cpu_usage<S: core::hash::BuildHasher>( fn linux_cpu_usage<S: core::hash::BuildHasher>(
pid: u32, cpu_usage: f64, cpu_fraction: f64, pid: u32, cpu_usage: f64, cpu_fraction: f64,
prev_pid_stats: &HashMap<String, (f64, Instant), S>, prev_pid_stats: &HashMap<String, (f64, Instant), S>, use_current_cpu_total: bool,
new_pid_stats: &mut HashMap<String, (f64, Instant), S>, use_current_cpu_total: bool,
curr_time: Instant, curr_time: Instant,
) -> std::io::Result<f64> { ) -> std::io::Result<(f64, (String, (f64, Instant)))> {
// Based heavily on https://stackoverflow.com/a/23376195 and https://stackoverflow.com/a/1424556 // Based heavily on https://stackoverflow.com/a/23376195 and https://stackoverflow.com/a/1424556
let before_proc_val: f64 = if prev_pid_stats.contains_key(&pid.to_string()) { let before_proc_val: f64 = if prev_pid_stats.contains_key(&pid.to_string()) {
prev_pid_stats prev_pid_stats
@ -140,27 +139,36 @@ fn linux_cpu_usage<S: core::hash::BuildHasher>(
(after_proc_val - before_proc_val) / cpu_usage * 100_f64 (after_proc_val - before_proc_val) / cpu_usage * 100_f64
);*/ );*/
new_pid_stats.insert(pid.to_string(), (after_proc_val, curr_time)); let new_dict_entry = (pid.to_string(), (after_proc_val, curr_time));
if use_current_cpu_total { if use_current_cpu_total {
Ok((after_proc_val - before_proc_val) / cpu_usage * 100_f64) Ok((
(after_proc_val - before_proc_val) / cpu_usage * 100_f64,
new_dict_entry,
))
} else { } else {
Ok((after_proc_val - before_proc_val) / cpu_usage * 100_f64 * cpu_fraction) Ok((
(after_proc_val - before_proc_val) / cpu_usage * 100_f64 * cpu_fraction,
new_dict_entry,
))
} }
} }
fn convert_ps<S: core::hash::BuildHasher>( fn convert_ps<S: core::hash::BuildHasher>(
process: &str, cpu_usage: f64, cpu_fraction: f64, process: &str, cpu_usage: f64, cpu_fraction: f64,
prev_pid_stats: &HashMap<String, (f64, Instant), S>, prev_pid_stats: &HashMap<String, (f64, Instant), S>, use_current_cpu_total: bool,
new_pid_stats: &mut HashMap<String, (f64, Instant), S>, use_current_cpu_total: bool,
curr_time: Instant, curr_time: Instant,
) -> std::io::Result<ProcessHarvest> { ) -> std::io::Result<(ProcessHarvest, (String, (f64, Instant)))> {
if process.trim().to_string().is_empty() { if process.trim().to_string().is_empty() {
return Ok(ProcessHarvest { let dummy_result = (String::default(), (0.0, Instant::now()));
pid: 0, return Ok((
name: "".to_string(), ProcessHarvest {
mem_usage_percent: 0.0, pid: 0,
cpu_usage_percent: 0.0, name: "".to_string(),
}); mem_usage_percent: 0.0,
cpu_usage_percent: 0.0,
},
dummy_result,
));
} }
let pid = (&process[..11]) let pid = (&process[..11])
@ -175,20 +183,23 @@ fn convert_ps<S: core::hash::BuildHasher>(
.parse::<f64>() .parse::<f64>()
.unwrap_or(0_f64); .unwrap_or(0_f64);
Ok(ProcessHarvest { let (cpu_usage_percent, new_entry) = linux_cpu_usage(
pid, pid,
name, cpu_usage,
mem_usage_percent, cpu_fraction,
cpu_usage_percent: linux_cpu_usage( prev_pid_stats,
use_current_cpu_total,
curr_time,
)?;
Ok((
ProcessHarvest {
pid, pid,
cpu_usage, name,
cpu_fraction, mem_usage_percent,
prev_pid_stats, cpu_usage_percent: cpu_usage_percent,
new_pid_stats, },
use_current_cpu_total, new_entry,
curr_time, ))
)?,
})
} }
pub fn get_sorted_processes_list( pub fn get_sorted_processes_list(
@ -199,8 +210,6 @@ pub fn get_sorted_processes_list(
let mut process_vector: Vec<ProcessHarvest> = Vec::new(); let mut process_vector: Vec<ProcessHarvest> = Vec::new();
if cfg!(target_os = "linux") { if cfg!(target_os = "linux") {
// Linux specific - this is a massive pain... ugh.
let ps_result = Command::new("ps") let ps_result = Command::new("ps")
.args(&["-axo", "pid:10,comm:50,%mem:5", "--noheader"]) .args(&["-axo", "pid:10,comm:50,%mem:5", "--noheader"])
.output()?; .output()?;
@ -213,18 +222,19 @@ pub fn get_sorted_processes_list(
let mut new_pid_stats: HashMap<String, (f64, Instant), RandomState> = HashMap::new(); let mut new_pid_stats: HashMap<String, (f64, Instant), RandomState> = HashMap::new();
for process in process_stream { for process in process_stream {
if let Ok(process_object) = convert_ps( if let Ok((process_object, new_entry)) = convert_ps(
process, process,
cpu_usage, cpu_usage,
cpu_fraction, cpu_fraction,
&prev_pid_stats, &prev_pid_stats,
&mut new_pid_stats,
use_current_cpu_total, use_current_cpu_total,
curr_time, curr_time,
) { ) {
if !process_object.name.is_empty() { if !process_object.name.is_empty() {
process_vector.push(process_object); process_vector.push(process_object);
} }
new_pid_stats.insert(new_entry.0, new_entry.1);
} }
} }

View File

@ -4,6 +4,8 @@ extern crate log;
extern crate clap; extern crate clap;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
#[macro_use]
extern crate futures;
use serde::Deserialize; use serde::Deserialize;