diff --git a/src/app.rs b/src/app.rs index 4049ede6..3b73fda7 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,4 +1,4 @@ -pub mod data_farmer; +pub mod data; pub mod filter; pub mod frozen_state; pub mod layout_manager; @@ -12,7 +12,7 @@ use std::{ use anyhow::bail; use concat_string::concat_string; -use data_farmer::*; +use data::*; use filter::*; use frozen_state::FrozenState; use hashbrown::HashMap; diff --git a/src/app/data_farmer.rs b/src/app/data.rs similarity index 52% rename from src/app/data_farmer.rs rename to src/app/data.rs index 200a3a81..0e88c1ce 100644 --- a/src/app/data_farmer.rs +++ b/src/app/data.rs @@ -1,19 +1,10 @@ -//! In charge of cleaning, processing, and managing data. I couldn't think of -//! a better name for the file. Since I called data collection "harvesting", -//! then this is the farmer I guess. -//! -//! Essentially the main goal is to shift the initial calculation and -//! distribution of joiner points and data to one central location that will -//! only do it *once* upon receiving the data --- as opposed to doing it on -//! canvas draw, which will be a costly process. -//! -//! This will also handle the *cleaning* of stale data. That should be done -//! in some manner (timer on another thread, some loop) that will occasionally -//! call the purging function. Failure to do so *will* result in a growing -//! memory usage and higher CPU usage - you will be trying to process more and -//! more points as this is used! +//! In charge of cleaning, processing, and managing data. -use std::{collections::BTreeMap, time::Instant, vec::Vec}; +use std::{ + collections::BTreeMap, + time::{Duration, Instant}, + vec::Vec, +}; use hashbrown::HashMap; @@ -28,6 +19,254 @@ use crate::{ dec_bytes_per_second_string, }; +/// A chunk of data, corresponding to the indices of time slice. +#[derive(Debug)] +pub struct DataChunk { + /// The start offset of this chunk, should correspond to the time_offsets. If that updates, + /// this MUST also update. + start_offset: usize, + + /// The end offset of this chunk, should correspond to the time_offsets. If that updates, + /// this MUST also update. + end_offset: usize, + + /// The actual value data! + data: Vec, +} + +impl DataChunk { + /// Create a new [`DataChunk`] starting from `offset`. + pub fn new(initial_value: f64, start_offset: usize) -> Self { + Self { + start_offset, + end_offset: start_offset + 1, + data: vec![initial_value], + } + } + + /// Try and prune the chunk. + pub fn try_prune(&mut self, prune_end_index: usize) -> bool { + if prune_end_index > self.end_offset { + self.data.clear(); + self.start_offset = 0; + self.end_offset = 0; + + true + } else if prune_end_index > self.start_offset { + // We know the prune index must be between the start and end, so we're safe + // to blindly do subtaction here, assuming our other invariants held. + + let drain_end = prune_end_index - self.start_offset; + + self.data.drain(..drain_end); + + self.start_offset = 0; + self.end_offset -= prune_end_index; + + true + } else { + false + } + } + + /// Update the offsets of this chunk by `usize`. + pub fn update_indices(&mut self, offset: usize) { + self.start_offset -= offset; + self.end_offset -= offset; + } +} + +/// Represents timeseries _value_ data in a chunked fashion. +#[derive(Debug, Default)] +pub struct ValueChunk { + /// The currently-updated chunk. + current: Option, + + /// Previous chunks, this should be added to if a data gap is found. + previous_chunks: Vec, +} + +impl ValueChunk { + /// Add a value to this chunk. + pub fn add(&mut self, value: f64, index: usize) { + match self.current.as_mut() { + Some(current) => { + current.data.push(value); + current.end_offset = index + 1; + } + None => { + self.current = Some(DataChunk::new(value, index)); + } + } + } + + /// End the current chunk. + pub fn end_chunk(&mut self) { + if let Some(current) = self.current.take() { + self.previous_chunks.push(current); + } + } + + /// Prune all chunks up to (and not including) the current end index, and update all internal indicies to match this. + pub fn prune(&mut self, remove_up_to: usize) { + // Try to prune the current; if we _can_ prune the current, then it likely means all the + // previous chunks should also be pruned. + + let pruned_current = if let Some(current) = self.current.as_mut() { + current.try_prune(remove_up_to) + } else { + false + }; + + if pruned_current { + // If we could prune the current chunk, then it means all other chunks are outdated. Remove them. + if !self.previous_chunks.is_empty() { + self.previous_chunks.clear(); + self.previous_chunks.shrink_to_fit(); + } + } else { + // Otherwise, try and prune the previous chunks + adjust the remaining chunks' offsets. + + for (index, previous_chunk) in self.previous_chunks.iter_mut().enumerate().rev() { + if previous_chunk.try_prune(remove_up_to) { + let end_index = if previous_chunk.end_offset == 0 { + index + 1 + } else { + index + }; + + self.previous_chunks.drain(0..end_index); + + if let Some(current) = &mut self.current { + current.update_indices(remove_up_to); + } + + for previous_chunk in self.previous_chunks.iter_mut().skip(1) { + previous_chunk.update_indices(remove_up_to); + } + + return; + } + } + } + } +} + +#[derive(Debug, Clone, Copy)] +struct DefaultInstant(Instant); + +impl Default for DefaultInstant { + fn default() -> Self { + Self(Instant::now()) + } +} + +/// Represents timeseries data in a chunked, deduped manner. +/// +/// Properties: +/// - Time in this manner is represented in a reverse-offset fashion from the current time. +/// - All data is stored in SoA fashion. +/// - Values are stored in a chunked format, which facilitates gaps in data collection if needed. +/// - Additional metadata is stored to make data pruning over time easy. +#[derive(Debug, Default)] +pub struct TimeSeriesData { + /// The last-updated timestamp. The last time offset is based on this value. + /// When updating this value ensure you also update time_offsets with the + /// new offset from the new time to the original value. + current_time: DefaultInstant, + + /// All time offsets, first element is the oldest value. + time_offsets: Vec, + + /// Time offset ranges to help faciliate pruning. Must be in + /// sorted order. Offset ranges are [start, end) (that is, exclusive). + /// + /// Storing double usize might be wasteful but eh. + offset_ranges: Vec<(Instant, usize, usize)>, + + /// Network RX data chunks. + rx: ValueChunk, + + /// Network TX data chunks. + tx: ValueChunk, + + /// CPU data chunks. + cpu: ValueChunk, + + /// Memory data chunks. + mem: ValueChunk, + + /// Swap data chunks. + swap: ValueChunk, + + #[cfg(not(target_os = "windows"))] + /// Cache data chunks. + cache_mem: ValueChunk, + + #[cfg(feature = "zfs")] + /// Arc data chunks. + arc_mem: ValueChunk, + + #[cfg(feature = "gpu")] + /// GPU memory data chunks. + gpu_mem: Vec, +} + +impl TimeSeriesData { + /// Add a new data point. + pub fn add(&mut self, data: Data) { + let time = data + .collection_time + .duration_since(self.current_time.0) + .as_millis() as f32; + self.current_time.0 = data.collection_time; + self.time_offsets.push(time); + + let index = self.time_offsets.len() - 1; + + if let Some(network) = data.network { + self.rx.add(network.rx as f64, index); + self.tx.add(network.tx as f64, index); + } + } + + /// Prune any data older than the given duration. + pub fn prune(&mut self, max_age: Duration) { + let remove_index = match self.offset_ranges.binary_search_by(|(instant, _, _)| { + self.current_time + .0 + .duration_since(*instant) + .cmp(&max_age) + .reverse() + }) { + Ok(index) => index, + Err(index) => index, + }; + + if let Some((_offset, start, end)) = self.offset_ranges.drain(0..remove_index).last() { + // Note that end here is _exclusive_. + self.time_offsets.drain(0..end); + + self.rx.prune(end); + self.tx.prune(end); + self.cpu.prune(end); + self.mem.prune(end); + self.swap.prune(end); + + #[cfg(not(target_os = "windows"))] + self.cache_mem.prune(end); + + #[cfg(feature = "zfs")] + self.arc_mem.prune(end); + + #[cfg(feature = "gpu")] + for gpu in &mut self.gpu_mem { + gpu.prune(end); + } + } + } +} + #[derive(Debug, Default, Clone)] pub struct TimedData { pub rx_data: f64, @@ -461,3 +700,180 @@ impl DataCollection { self.gpu_harvest = gpu; } } + +#[cfg(test)] +mod test { + use super::*; + + /// Basic sanity test for current chunk adding/pruning behaviour. + + #[test] + fn prune_current_chunk() { + let mut vc = ValueChunk::default(); + let times = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; + + let mut index = 1; + for time in ×[index..] { + vc.add(*time * 2.0, index); + index += 1 + } + + assert_eq!( + (&vc).current.as_ref().unwrap().data, + &[4.0, 6.0, 8.0, 10.0, 12.0] + ); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 1); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 6); + + // Test removing partially. + vc.prune(3); + assert_eq!((&vc).current.as_ref().unwrap().data, &[8.0, 10.0, 12.0]); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 3); + + // Test fully clearing house. + vc.prune(3); + assert_eq!((&vc).current.as_ref().unwrap().data, &[]); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 0); + + // Test re-adding values and clearing again. + let second_input = [1.0, 2.0, 3.0, 4.0]; + for (index, val) in second_input.into_iter().enumerate() { + vc.add(val, index); + } + + assert_eq!((&vc).current.as_ref().unwrap().data, &second_input); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 4); + + vc.prune(3); + assert_eq!((&vc).current.as_ref().unwrap().data, &[4.0]); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 1); + + vc.prune(0); + assert_eq!((&vc).current.as_ref().unwrap().data, &[4.0]); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 1); + + vc.prune(1); + assert_eq!((&vc).current.as_ref().unwrap().data, &[]); + assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0); + assert_eq!((&vc).current.as_ref().unwrap().end_offset, 0); + } + + /// Test pruning multiple chunks. + #[test] + fn prune_multi() { + // Let's simulate the following: + // + // |_________________|_________________|____________| + // 0 chunk 1 5 no data 10 chunk 2 20 + + let mut vc = ValueChunk::default(); + + for i in 0..5 { + vc.add((i * 10) as f64, i); + } + + vc.end_chunk(); + + for i in 10..20 { + vc.add((i * 100) as f64, i); + } + + assert!(vc.current.is_some()); + assert_eq!(vc.previous_chunks.len(), 1); + + assert_eq!(vc.current.as_ref().unwrap().data.len(), 10); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 10); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 20); + + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 5); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 5); + + // Try partial pruning previous, make sure it affects current indices too. + vc.prune(3); + + assert!(vc.current.is_some()); + assert_eq!(vc.previous_chunks.len(), 1); + + assert_eq!(vc.current.as_ref().unwrap().data.len(), 10); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 7); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 17); + + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 2); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 2); + + // Try full pruning previous. + vc.prune(2); + + assert!(vc.current.is_some()); + assert!(vc.previous_chunks.is_empty()); + + assert_eq!(vc.current.as_ref().unwrap().data.len(), 10); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 5); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 15); + + // End chunk, then add a new one. Then end chunk and add a new one. Then end chunk and add a new one. + vc.end_chunk(); + for i in 15..30 { + vc.add((i * 1000) as f64, i); + } + + vc.end_chunk(); + for i in 35..50 { + vc.add((i * 10000) as f64, i); + } + + vc.end_chunk(); + for i in 58..60 { + vc.add((i * 100000) as f64, i); + } + + assert!(vc.current.is_some()); + assert_eq!(vc.previous_chunks.len(), 3); + + // Ensure current chunk is downgraded to previous_chunks. + assert_eq!(vc.previous_chunks[0].data.len(), 10); + + // Try pruning the middle chunk, ensure older chunks are cleared and newer chunks are updated. + vc.prune(25); + + assert!(vc.current.is_some()); + assert_eq!(vc.previous_chunks.len(), 2); + + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 5); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0); + assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 5); + + // Gap of 5, so 5 + 5 = 10 + assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().data.len(), 15); + assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().start_offset, 10); + assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().end_offset, 25); + + // Gap of 8, so 25 + 8 = 33 + assert_eq!(vc.current.as_ref().unwrap().data.len(), 2); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 33); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 35); + + // Try pruning current. Ensure previous chunks are cleared. + vc.prune(34); + + assert!(vc.current.is_some()); + assert!(vc.previous_chunks.is_empty()); + + assert_eq!(vc.current.as_ref().unwrap().data.len(), 1); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 0); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 1); + + vc.prune(1); + + assert!(vc.current.as_ref().unwrap().data.is_empty()); + assert_eq!(vc.current.as_ref().unwrap().start_offset, 0); + assert_eq!(vc.current.as_ref().unwrap().end_offset, 0); + } +} diff --git a/src/canvas/components/tui_widget/time_chart.rs b/src/canvas/components/tui_widget/time_chart.rs index 787d1393..78da1ea6 100644 --- a/src/canvas/components/tui_widget/time_chart.rs +++ b/src/canvas/components/tui_widget/time_chart.rs @@ -391,7 +391,7 @@ pub struct TimeChart<'a> { } impl<'a> TimeChart<'a> { - /// Creates a chart with the given [datasets](Dataset) + /// Creates a chart with the given [datasets](Dataset), /// /// A chart can render multiple datasets. pub fn new(datasets: Vec>) -> TimeChart<'a> { diff --git a/src/data_collection/memory.rs b/src/data_collection/memory.rs index ee09a809..98275dc8 100644 --- a/src/data_collection/memory.rs +++ b/src/data_collection/memory.rs @@ -23,6 +23,7 @@ pub struct MemHarvest { impl MemHarvest { /// Return the use percentage. If the total bytes is 0, then this returns `None`. + #[inline] pub fn checked_percent(&self) -> Option { let used = self.used_bytes as f64; let total = self.total_bytes as f64; diff --git a/src/data_conversion.rs b/src/data_conversion.rs index 14e7a1fe..40216225 100644 --- a/src/data_conversion.rs +++ b/src/data_conversion.rs @@ -6,7 +6,7 @@ use std::borrow::Cow; use crate::{ - app::{data_farmer::DataCollection, AxisScaling}, + app::{data::DataCollection, AxisScaling}, canvas::components::time_chart::Point, data_collection::{cpu::CpuDataType, memory::MemHarvest, temperature::TemperatureType}, utils::{data_prefixes::*, data_units::DataUnit}, @@ -35,7 +35,6 @@ pub enum CpuWidgetData { All, Entry { data_type: CpuDataType, - /// A point here represents time (x) and value (y). data: Vec, last_entry: f64, }, diff --git a/src/widgets/process_table.rs b/src/widgets/process_table.rs index 9085a536..32d1a63c 100644 --- a/src/widgets/process_table.rs +++ b/src/widgets/process_table.rs @@ -15,7 +15,7 @@ use sort_table::SortTableColumn; use crate::{ app::{ - data_farmer::{DataCollection, ProcessData}, + data::{DataCollection, ProcessData}, AppConfigFields, AppSearchState, }, canvas::components::data_table::{