refactor: add new method of storing timeseries data

This commit is contained in:
Clement Tsang 2024-12-26 05:25:03 -05:00 committed by ClementTsang
parent c9ffc41e51
commit 20a88e0638
No known key found for this signature in database
GPG Key ID: DC3B7867D8D97095
6 changed files with 437 additions and 21 deletions

View File

@ -1,4 +1,4 @@
pub mod data_farmer;
pub mod data;
pub mod filter;
pub mod frozen_state;
pub mod layout_manager;
@ -12,7 +12,7 @@ use std::{
use anyhow::bail;
use concat_string::concat_string;
use data_farmer::*;
use data::*;
use filter::*;
use frozen_state::FrozenState;
use hashbrown::HashMap;

View File

@ -1,19 +1,10 @@
//! In charge of cleaning, processing, and managing data. I couldn't think of
//! a better name for the file. Since I called data collection "harvesting",
//! then this is the farmer I guess.
//!
//! Essentially the main goal is to shift the initial calculation and
//! distribution of joiner points and data to one central location that will
//! only do it *once* upon receiving the data --- as opposed to doing it on
//! canvas draw, which will be a costly process.
//!
//! This will also handle the *cleaning* of stale data. That should be done
//! in some manner (timer on another thread, some loop) that will occasionally
//! call the purging function. Failure to do so *will* result in a growing
//! memory usage and higher CPU usage - you will be trying to process more and
//! more points as this is used!
//! In charge of cleaning, processing, and managing data.
use std::{collections::BTreeMap, time::Instant, vec::Vec};
use std::{
collections::BTreeMap,
time::{Duration, Instant},
vec::Vec,
};
use hashbrown::HashMap;
@ -28,6 +19,254 @@ use crate::{
dec_bytes_per_second_string,
};
/// A chunk of data, corresponding to the indices of time slice.
#[derive(Debug)]
pub struct DataChunk {
/// The start offset of this chunk, should correspond to the time_offsets. If that updates,
/// this MUST also update.
start_offset: usize,
/// The end offset of this chunk, should correspond to the time_offsets. If that updates,
/// this MUST also update.
end_offset: usize,
/// The actual value data!
data: Vec<f64>,
}
impl DataChunk {
/// Create a new [`DataChunk`] starting from `offset`.
pub fn new(initial_value: f64, start_offset: usize) -> Self {
Self {
start_offset,
end_offset: start_offset + 1,
data: vec![initial_value],
}
}
/// Try and prune the chunk.
pub fn try_prune(&mut self, prune_end_index: usize) -> bool {
if prune_end_index > self.end_offset {
self.data.clear();
self.start_offset = 0;
self.end_offset = 0;
true
} else if prune_end_index > self.start_offset {
// We know the prune index must be between the start and end, so we're safe
// to blindly do subtaction here, assuming our other invariants held.
let drain_end = prune_end_index - self.start_offset;
self.data.drain(..drain_end);
self.start_offset = 0;
self.end_offset -= prune_end_index;
true
} else {
false
}
}
/// Update the offsets of this chunk by `usize`.
pub fn update_indices(&mut self, offset: usize) {
self.start_offset -= offset;
self.end_offset -= offset;
}
}
/// Represents timeseries _value_ data in a chunked fashion.
#[derive(Debug, Default)]
pub struct ValueChunk {
/// The currently-updated chunk.
current: Option<DataChunk>,
/// Previous chunks, this should be added to if a data gap is found.
previous_chunks: Vec<DataChunk>,
}
impl ValueChunk {
/// Add a value to this chunk.
pub fn add(&mut self, value: f64, index: usize) {
match self.current.as_mut() {
Some(current) => {
current.data.push(value);
current.end_offset = index + 1;
}
None => {
self.current = Some(DataChunk::new(value, index));
}
}
}
/// End the current chunk.
pub fn end_chunk(&mut self) {
if let Some(current) = self.current.take() {
self.previous_chunks.push(current);
}
}
/// Prune all chunks up to (and not including) the current end index, and update all internal indicies to match this.
pub fn prune(&mut self, remove_up_to: usize) {
// Try to prune the current; if we _can_ prune the current, then it likely means all the
// previous chunks should also be pruned.
let pruned_current = if let Some(current) = self.current.as_mut() {
current.try_prune(remove_up_to)
} else {
false
};
if pruned_current {
// If we could prune the current chunk, then it means all other chunks are outdated. Remove them.
if !self.previous_chunks.is_empty() {
self.previous_chunks.clear();
self.previous_chunks.shrink_to_fit();
}
} else {
// Otherwise, try and prune the previous chunks + adjust the remaining chunks' offsets.
for (index, previous_chunk) in self.previous_chunks.iter_mut().enumerate().rev() {
if previous_chunk.try_prune(remove_up_to) {
let end_index = if previous_chunk.end_offset == 0 {
index + 1
} else {
index
};
self.previous_chunks.drain(0..end_index);
if let Some(current) = &mut self.current {
current.update_indices(remove_up_to);
}
for previous_chunk in self.previous_chunks.iter_mut().skip(1) {
previous_chunk.update_indices(remove_up_to);
}
return;
}
}
}
}
}
#[derive(Debug, Clone, Copy)]
struct DefaultInstant(Instant);
impl Default for DefaultInstant {
fn default() -> Self {
Self(Instant::now())
}
}
/// Represents timeseries data in a chunked, deduped manner.
///
/// Properties:
/// - Time in this manner is represented in a reverse-offset fashion from the current time.
/// - All data is stored in SoA fashion.
/// - Values are stored in a chunked format, which facilitates gaps in data collection if needed.
/// - Additional metadata is stored to make data pruning over time easy.
#[derive(Debug, Default)]
pub struct TimeSeriesData {
/// The last-updated timestamp. The last time offset is based on this value.
/// When updating this value ensure you also update time_offsets with the
/// new offset from the new time to the original value.
current_time: DefaultInstant,
/// All time offsets, first element is the oldest value.
time_offsets: Vec<f32>,
/// Time offset ranges to help faciliate pruning. Must be in
/// sorted order. Offset ranges are [start, end) (that is, exclusive).
///
/// Storing double usize might be wasteful but eh.
offset_ranges: Vec<(Instant, usize, usize)>,
/// Network RX data chunks.
rx: ValueChunk,
/// Network TX data chunks.
tx: ValueChunk,
/// CPU data chunks.
cpu: ValueChunk,
/// Memory data chunks.
mem: ValueChunk,
/// Swap data chunks.
swap: ValueChunk,
#[cfg(not(target_os = "windows"))]
/// Cache data chunks.
cache_mem: ValueChunk,
#[cfg(feature = "zfs")]
/// Arc data chunks.
arc_mem: ValueChunk,
#[cfg(feature = "gpu")]
/// GPU memory data chunks.
gpu_mem: Vec<ValueChunk>,
}
impl TimeSeriesData {
/// Add a new data point.
pub fn add(&mut self, data: Data) {
let time = data
.collection_time
.duration_since(self.current_time.0)
.as_millis() as f32;
self.current_time.0 = data.collection_time;
self.time_offsets.push(time);
let index = self.time_offsets.len() - 1;
if let Some(network) = data.network {
self.rx.add(network.rx as f64, index);
self.tx.add(network.tx as f64, index);
}
}
/// Prune any data older than the given duration.
pub fn prune(&mut self, max_age: Duration) {
let remove_index = match self.offset_ranges.binary_search_by(|(instant, _, _)| {
self.current_time
.0
.duration_since(*instant)
.cmp(&max_age)
.reverse()
}) {
Ok(index) => index,
Err(index) => index,
};
if let Some((_offset, start, end)) = self.offset_ranges.drain(0..remove_index).last() {
// Note that end here is _exclusive_.
self.time_offsets.drain(0..end);
self.rx.prune(end);
self.tx.prune(end);
self.cpu.prune(end);
self.mem.prune(end);
self.swap.prune(end);
#[cfg(not(target_os = "windows"))]
self.cache_mem.prune(end);
#[cfg(feature = "zfs")]
self.arc_mem.prune(end);
#[cfg(feature = "gpu")]
for gpu in &mut self.gpu_mem {
gpu.prune(end);
}
}
}
}
#[derive(Debug, Default, Clone)]
pub struct TimedData {
pub rx_data: f64,
@ -461,3 +700,180 @@ impl DataCollection {
self.gpu_harvest = gpu;
}
}
#[cfg(test)]
mod test {
use super::*;
/// Basic sanity test for current chunk adding/pruning behaviour.
#[test]
fn prune_current_chunk() {
let mut vc = ValueChunk::default();
let times = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0];
let mut index = 1;
for time in &times[index..] {
vc.add(*time * 2.0, index);
index += 1
}
assert_eq!(
(&vc).current.as_ref().unwrap().data,
&[4.0, 6.0, 8.0, 10.0, 12.0]
);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 1);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 6);
// Test removing partially.
vc.prune(3);
assert_eq!((&vc).current.as_ref().unwrap().data, &[8.0, 10.0, 12.0]);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 3);
// Test fully clearing house.
vc.prune(3);
assert_eq!((&vc).current.as_ref().unwrap().data, &[]);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 0);
// Test re-adding values and clearing again.
let second_input = [1.0, 2.0, 3.0, 4.0];
for (index, val) in second_input.into_iter().enumerate() {
vc.add(val, index);
}
assert_eq!((&vc).current.as_ref().unwrap().data, &second_input);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 4);
vc.prune(3);
assert_eq!((&vc).current.as_ref().unwrap().data, &[4.0]);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 1);
vc.prune(0);
assert_eq!((&vc).current.as_ref().unwrap().data, &[4.0]);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 1);
vc.prune(1);
assert_eq!((&vc).current.as_ref().unwrap().data, &[]);
assert_eq!((&vc).current.as_ref().unwrap().start_offset, 0);
assert_eq!((&vc).current.as_ref().unwrap().end_offset, 0);
}
/// Test pruning multiple chunks.
#[test]
fn prune_multi() {
// Let's simulate the following:
//
// |_________________|_________________|____________|
// 0 chunk 1 5 no data 10 chunk 2 20
let mut vc = ValueChunk::default();
for i in 0..5 {
vc.add((i * 10) as f64, i);
}
vc.end_chunk();
for i in 10..20 {
vc.add((i * 100) as f64, i);
}
assert!(vc.current.is_some());
assert_eq!(vc.previous_chunks.len(), 1);
assert_eq!(vc.current.as_ref().unwrap().data.len(), 10);
assert_eq!(vc.current.as_ref().unwrap().start_offset, 10);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 20);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 5);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 5);
// Try partial pruning previous, make sure it affects current indices too.
vc.prune(3);
assert!(vc.current.is_some());
assert_eq!(vc.previous_chunks.len(), 1);
assert_eq!(vc.current.as_ref().unwrap().data.len(), 10);
assert_eq!(vc.current.as_ref().unwrap().start_offset, 7);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 17);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 2);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 2);
// Try full pruning previous.
vc.prune(2);
assert!(vc.current.is_some());
assert!(vc.previous_chunks.is_empty());
assert_eq!(vc.current.as_ref().unwrap().data.len(), 10);
assert_eq!(vc.current.as_ref().unwrap().start_offset, 5);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 15);
// End chunk, then add a new one. Then end chunk and add a new one. Then end chunk and add a new one.
vc.end_chunk();
for i in 15..30 {
vc.add((i * 1000) as f64, i);
}
vc.end_chunk();
for i in 35..50 {
vc.add((i * 10000) as f64, i);
}
vc.end_chunk();
for i in 58..60 {
vc.add((i * 100000) as f64, i);
}
assert!(vc.current.is_some());
assert_eq!(vc.previous_chunks.len(), 3);
// Ensure current chunk is downgraded to previous_chunks.
assert_eq!(vc.previous_chunks[0].data.len(), 10);
// Try pruning the middle chunk, ensure older chunks are cleared and newer chunks are updated.
vc.prune(25);
assert!(vc.current.is_some());
assert_eq!(vc.previous_chunks.len(), 2);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().data.len(), 5);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().start_offset, 0);
assert_eq!(vc.previous_chunks.get(0).as_ref().unwrap().end_offset, 5);
// Gap of 5, so 5 + 5 = 10
assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().data.len(), 15);
assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().start_offset, 10);
assert_eq!(vc.previous_chunks.get(1).as_ref().unwrap().end_offset, 25);
// Gap of 8, so 25 + 8 = 33
assert_eq!(vc.current.as_ref().unwrap().data.len(), 2);
assert_eq!(vc.current.as_ref().unwrap().start_offset, 33);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 35);
// Try pruning current. Ensure previous chunks are cleared.
vc.prune(34);
assert!(vc.current.is_some());
assert!(vc.previous_chunks.is_empty());
assert_eq!(vc.current.as_ref().unwrap().data.len(), 1);
assert_eq!(vc.current.as_ref().unwrap().start_offset, 0);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 1);
vc.prune(1);
assert!(vc.current.as_ref().unwrap().data.is_empty());
assert_eq!(vc.current.as_ref().unwrap().start_offset, 0);
assert_eq!(vc.current.as_ref().unwrap().end_offset, 0);
}
}

View File

@ -391,7 +391,7 @@ pub struct TimeChart<'a> {
}
impl<'a> TimeChart<'a> {
/// Creates a chart with the given [datasets](Dataset)
/// Creates a chart with the given [datasets](Dataset),
///
/// A chart can render multiple datasets.
pub fn new(datasets: Vec<Dataset<'a>>) -> TimeChart<'a> {

View File

@ -23,6 +23,7 @@ pub struct MemHarvest {
impl MemHarvest {
/// Return the use percentage. If the total bytes is 0, then this returns `None`.
#[inline]
pub fn checked_percent(&self) -> Option<f64> {
let used = self.used_bytes as f64;
let total = self.total_bytes as f64;

View File

@ -6,7 +6,7 @@
use std::borrow::Cow;
use crate::{
app::{data_farmer::DataCollection, AxisScaling},
app::{data::DataCollection, AxisScaling},
canvas::components::time_chart::Point,
data_collection::{cpu::CpuDataType, memory::MemHarvest, temperature::TemperatureType},
utils::{data_prefixes::*, data_units::DataUnit},
@ -35,7 +35,6 @@ pub enum CpuWidgetData {
All,
Entry {
data_type: CpuDataType,
/// A point here represents time (x) and value (y).
data: Vec<Point>,
last_entry: f64,
},

View File

@ -15,7 +15,7 @@ use sort_table::SortTableColumn;
use crate::{
app::{
data_farmer::{DataCollection, ProcessData},
data::{DataCollection, ProcessData},
AppConfigFields, AppSearchState,
},
canvas::components::data_table::{