Changes on compact db
This commit is contained in:
parent
981f8c709b
commit
66b614d631
|
@ -276,6 +276,7 @@ CREATE TABLE IF NOT EXISTS `tagente_modulo` (
|
|||
`warning_time` INT UNSIGNED DEFAULT 0,
|
||||
`quiet_by_downtime` TINYINT NOT NULL DEFAULT 0,
|
||||
`disabled_by_downtime` TINYINT NOT NULL DEFAULT 0,
|
||||
`last_compact` TIMESTAMP NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (`id_agente_modulo`),
|
||||
KEY `main_idx` (`id_agente_modulo`,`id_agente`),
|
||||
KEY `tam_agente` (`id_agente`),
|
||||
|
|
|
@ -25,6 +25,9 @@ use DBI; # DB interface with MySQL
|
|||
use POSIX qw(strftime);
|
||||
use File::Path qw(rmtree);
|
||||
use Time::HiRes qw(usleep);
|
||||
use Time::Piece;
|
||||
use List::Util qw(min);
|
||||
use List::Util qw(sum);
|
||||
|
||||
# Default lib dir for RPM and DEB packages
|
||||
BEGIN { push @INC, '/usr/lib/perl5'; }
|
||||
|
@ -443,116 +446,187 @@ sub pandora_purgedb ($$$) {
|
|||
###############################################################################
|
||||
# Compact agent data.
|
||||
###############################################################################
|
||||
sub pandora_compactdb ($$$) {
|
||||
my ($conf, $dbh, $dbh_conf) = @_;
|
||||
sub pandora_compactdb {
|
||||
my ($conf, $dbh, $dbh_conf) = @_;
|
||||
|
||||
my $total_modules = get_db_value($dbh, "SELECT COUNT(id_agente_modulo) FROM tagente_modulo");
|
||||
|
||||
# Interval in hours to compact.
|
||||
my $compaction_interval = 24;
|
||||
my $compaction_factor = (3600 * $compaction_interval) / 300;
|
||||
|
||||
# Number of agents to be proceced on this execution
|
||||
my $agents_limit = int($total_modules / $compaction_factor);
|
||||
|
||||
my $last_compact_offset = pandora_get_config_value($dbh, "last_compact_offset");
|
||||
|
||||
unless ($last_compact_offset) {
|
||||
db_do($dbh, "INSERT INTO tconfig (token, value) VALUES ('last_compact_offset', '0')");
|
||||
$last_compact_offset = 0;
|
||||
}
|
||||
|
||||
# Obtain a group of modules to compact.
|
||||
my @module_groups = get_db_rows(
|
||||
$dbh,
|
||||
'SELECT id_agente_modulo, id_tipo_modulo, UNIX_TIMESTAMP(last_compact) as last_compact FROM tagente_modulo WHERE id_agente_modulo > ? LIMIT ?',
|
||||
$last_compact_offset,
|
||||
$agents_limit
|
||||
);
|
||||
|
||||
# Compact the group of modules.
|
||||
my $starting_time = time();
|
||||
pandora_compact_modules($dbh, $conf, @module_groups);
|
||||
my $ending_time = time();
|
||||
log_message('COMPACT', "Time taken: " . ($ending_time - $starting_time) . " seconds.");
|
||||
|
||||
# Add the offset.
|
||||
$last_compact_offset += $agents_limit;
|
||||
|
||||
# If the offset is higher than the max module start again
|
||||
$last_compact_offset = 0 if ($last_compact_offset >= $total_modules);
|
||||
|
||||
# Save the new offset
|
||||
db_do($dbh, "UPDATE tconfig SET value = ? WHERE token = 'last_compact_offset'", $last_compact_offset);
|
||||
}
|
||||
|
||||
sub pandora_compact_modules {
|
||||
my ($dbh, $conf, @module_groups) = @_;
|
||||
|
||||
# Obtain the minimun last compact from the block
|
||||
my $min_compact = min(map { $_->{"last_compact"} } @module_groups);
|
||||
|
||||
# if the min has not been setted
|
||||
if($min_compact == 0){
|
||||
$min_compact = get_db_value(
|
||||
$dbh,
|
||||
'SELECT MIN(td.utimestamp)
|
||||
FROM tagente_datos td, tagente_modulo tm
|
||||
WHERE td.id_agente_modulo = tm.id_agente_modulo
|
||||
AND tm.id_tipo_modulo not in (2, 6, 9, 18, 21, 31, 35, 100)
|
||||
AND td.id_agente_modulo BETWEEN ? AND ?',
|
||||
$module_groups[0]->{"id_agente_modulo"}, $module_groups[-1]->{"id_agente_modulo"}
|
||||
);
|
||||
}
|
||||
|
||||
# One week of data of data as max from the last min compact.
|
||||
my $max_compact = $min_compact + (60 * 60 * 24 * 7);
|
||||
|
||||
# If the last compact is on the future set the current time as max.
|
||||
$max_compact = time() if($max_compact > time());
|
||||
|
||||
log_message('COMPACT', "Compacting data of agents agents $module_groups[0]->{'id_agente_modulo'} to $module_groups[-1]->{'id_agente_modulo'}, data between " . strftime('%Y-%m-%d %H:%M:%S', localtime($min_compact)) . " and " . strftime('%Y-%m-%d %H:%M:%S', localtime($max_compact)));
|
||||
|
||||
# Obtain all the data from the interval to avoid multiple requests
|
||||
# Avoid pull data from modules that should be skipped.
|
||||
my @data = get_db_rows (
|
||||
$dbh,
|
||||
'SELECT td.id_agente_modulo, td.datos, td.utimestamp
|
||||
FROM tagente_datos td, tagente_modulo tm
|
||||
WHERE td.id_agente_modulo = tm.id_agente_modulo
|
||||
AND tm.id_tipo_modulo not in (2, 6, 9, 18, 21, 31, 35, 100)
|
||||
AND td.utimestamp < ? AND td.utimestamp >= ?
|
||||
AND td.id_agente_modulo BETWEEN ? AND ?
|
||||
',
|
||||
$max_compact, $min_compact, $module_groups[0]->{"id_agente_modulo"}, $module_groups[-1]->{"id_agente_modulo"}
|
||||
);
|
||||
|
||||
my $total_data = scalar(@data);
|
||||
my $proceced_total_data = 0;
|
||||
my $progress = 0;
|
||||
my $compactations = 0;
|
||||
|
||||
my %count_hash;
|
||||
my %id_agent_hash;
|
||||
my %value_hash;
|
||||
my %module_proc_hash;
|
||||
|
||||
return if ($conf->{'_days_compact'} == 0 || $conf->{'_step_compact'} < 1);
|
||||
|
||||
# Convert compact interval length from hours to seconds
|
||||
my $step = $conf->{'_step_compact'} * 3600;
|
||||
|
||||
# The oldest timestamp will be the lower limit
|
||||
my $limit_utime = get_db_value ($dbh, 'SELECT min(utimestamp) as min FROM tagente_datos');
|
||||
return unless (defined ($limit_utime) && $limit_utime > 0);
|
||||
# Max. 168 steps or 7 days in one hour steps.
|
||||
# This avoids blocking of old modules without last compact
|
||||
my $step_limit = 24 * 7;
|
||||
|
||||
# Calculate the start date
|
||||
my $start_utime = time() - $conf->{'_days_compact'} * 24 * 60 * 60;
|
||||
my $last_compact = $start_utime;
|
||||
my $stop_utime;
|
||||
# Compact the modules in this block
|
||||
foreach my $module (@module_groups) {
|
||||
$progress = $total_data == 0 ? 0 : ($proceced_total_data / $total_data) * 100;
|
||||
printf(strftime("\r" . "%H:%M:%S", localtime()) . ' [COMPACT] ' . "Progress: %.2f%%", $progress);
|
||||
|
||||
my $id = $module->{"id_agente_modulo"};
|
||||
my $module_type = $module->{"id_tipo_modulo"};
|
||||
|
||||
# Do not compact the same data twice!
|
||||
if (defined ($conf->{'_last_compact'}) && $conf->{'_last_compact'} > $limit_utime) {
|
||||
$limit_utime = $conf->{'_last_compact'};
|
||||
}
|
||||
|
||||
if ($start_utime <= $limit_utime || ( defined ($conf->{'_last_compact'}) && (($conf->{'_last_compact'} + 24 * 60 * 60) > $start_utime))) {
|
||||
log_message ('COMPACT', "Data already compacted.");
|
||||
return;
|
||||
}
|
||||
|
||||
log_message ('COMPACT', "Compacting data from " . strftime ("%Y-%m-%d %H:%M:%S", localtime($limit_utime)) . " to " . strftime ("%Y-%m-%d %H:%M:%S", localtime($start_utime)) . '.', '');
|
||||
next unless defined ($module_type);
|
||||
next if ($module_type == 2 || $module_type == 6 || $module_type == 9 || $module_type == 18 || $module_type == 21 || $module_type == 31 || $module_type == 35 || $module_type == 100);
|
||||
|
||||
# Prepare the query to retrieve data from an interval
|
||||
while (1) {
|
||||
# Obtain the data just for this module
|
||||
my @module_data = grep { $_->{"id_agente_modulo"} == $id } @data;
|
||||
my $total_elements = scalar(@module_data);
|
||||
|
||||
# Calculate the stop date for the interval
|
||||
$stop_utime = $start_utime - $step;
|
||||
# No data for this module.
|
||||
next if ($total_elements == 0);
|
||||
|
||||
# Out of limits
|
||||
last if ($start_utime < $limit_utime);
|
||||
# Obtain the last compact of this module to avoid compact something previously compacted
|
||||
my $last_compact = $module ->{"last_compact"};
|
||||
|
||||
# Mark the progress
|
||||
log_message ('', ".");
|
||||
# In case that the last compact has not been defined take the minimun data as last compact
|
||||
$last_compact = min(map { $_->{"utimestamp"} } @module_data)-1 if($last_compact == 0);
|
||||
|
||||
# Create an array to store INSERT commands
|
||||
my @insert_commands;
|
||||
my $insert_command = 'INSERT INTO tagente_datos (id_agente_modulo, datos, utimestamp) VALUES (?, ?, ?)';
|
||||
|
||||
my $step_number = 0;
|
||||
my $first_compact = $last_compact;
|
||||
|
||||
# Compact using the steps
|
||||
while($step_number < $step_limit){
|
||||
|
||||
my $next_compact = $last_compact + $step;
|
||||
|
||||
my @data = get_db_rows ($dbh, 'SELECT * FROM tagente_datos WHERE utimestamp < ? AND utimestamp >= ?', $start_utime, $stop_utime);
|
||||
# No data, move to the next interval
|
||||
if ($#data == 0) {
|
||||
$start_utime = $stop_utime;
|
||||
last if($next_compact > time());
|
||||
|
||||
# Obtain the data between the last compact and the next step.
|
||||
my @data_in_range = grep { $_->{"utimestamp"} > $last_compact && $_->{"utimestamp"} <= $next_compact } @module_data;
|
||||
|
||||
my $total_range_elements = scalar(@data_in_range);
|
||||
|
||||
# Nothing to compress, skip this step.
|
||||
if($total_range_elements == 0){
|
||||
$last_compact = $next_compact;
|
||||
next;
|
||||
}
|
||||
|
||||
# Get interval data
|
||||
foreach my $data (@data) {
|
||||
my $id_module = $data->{'id_agente_modulo'};
|
||||
if (! defined($module_proc_hash{$id_module})) {
|
||||
my $module_type = get_db_value ($dbh, 'SELECT id_tipo_modulo FROM tagente_modulo WHERE id_agente_modulo = ?', $id_module);
|
||||
next unless defined ($module_type);
|
||||
my $total_data = sum(map { $_->{"datos"} } @data_in_range);
|
||||
|
||||
# Mark proc modules.
|
||||
if ($module_type == 2 || $module_type == 6 || $module_type == 9 || $module_type == 18 || $module_type == 21 || $module_type == 31 || $module_type == 35 || $module_type == 100) {
|
||||
$module_proc_hash{$id_module} = 1;
|
||||
}
|
||||
else {
|
||||
$module_proc_hash{$id_module} = 0;
|
||||
}
|
||||
}
|
||||
my $avg = $total_data / $total_range_elements;
|
||||
|
||||
# Skip proc modules!
|
||||
next if ($module_proc_hash{$id_module} == 1);
|
||||
$proceced_total_data += $total_range_elements;
|
||||
|
||||
push @insert_commands, [$insert_command, $id, $avg, int($last_compact + ($step / 2))];
|
||||
|
||||
if (! defined($value_hash{$id_module})) {
|
||||
$value_hash{$id_module} = 0;
|
||||
$count_hash{$id_module} = 0;
|
||||
$last_compact = $next_compact;
|
||||
|
||||
$step_number +=1;
|
||||
# Small sleep to don't burn the DB
|
||||
usleep (1000);
|
||||
|
||||
}
|
||||
|
||||
if (! defined($id_agent_hash{$id_module})) {
|
||||
$id_agent_hash{$id_module} = $data->{'id_agente'};
|
||||
}
|
||||
}
|
||||
$dbh->begin_work;
|
||||
|
||||
$value_hash{$id_module} += $data->{'datos'};
|
||||
$count_hash{$id_module}++;
|
||||
}
|
||||
db_do ($dbh, 'DELETE FROM tagente_datos WHERE utimestamp > ? AND utimestamp <= ? AND id_agente_modulo = ?', $first_compact, $last_compact, $id);
|
||||
|
||||
# Delete interval from the database
|
||||
db_do ($dbh, 'DELETE ad FROM tagente_datos ad
|
||||
INNER JOIN tagente_modulo am ON ad.id_agente_modulo = am.id_agente_modulo AND am.id_tipo_modulo NOT IN (2,6,9,18,21,31,35,100)
|
||||
WHERE ad.utimestamp < ? AND ad.utimestamp >= ?', $start_utime, $stop_utime);
|
||||
# Execute the INSERT commands
|
||||
foreach my $command (@insert_commands) {
|
||||
my ($sql, @params) = @$command;
|
||||
db_do($dbh, $sql, @params);
|
||||
}
|
||||
|
||||
# Insert interval average value
|
||||
foreach my $key (keys(%value_hash)) {
|
||||
$value_hash{$key} /= $count_hash{$key};
|
||||
db_do ($dbh, 'INSERT INTO tagente_datos (id_agente_modulo, datos, utimestamp) VALUES (?, ?, ?)', $key, $value_hash{$key}, $stop_utime);
|
||||
delete($value_hash{$key});
|
||||
delete($count_hash{$key});
|
||||
}
|
||||
|
||||
usleep (1000); # Very small usleep, just to don't burn the DB
|
||||
# Move to the next interval
|
||||
$start_utime = $stop_utime;
|
||||
$dbh->commit;
|
||||
# Update the last compacted timestamp.
|
||||
db_do($dbh, "UPDATE tagente_modulo SET last_compact = FROM_UNIXTIME(?) WHERE id_agente_modulo = ?", $last_compact, $module ->{"id_agente_modulo"});
|
||||
$compactations += $step_number;
|
||||
}
|
||||
log_message ('', "\n");
|
||||
|
||||
# Mark the last compact date
|
||||
if (defined ($conf->{'_last_compact'})) {
|
||||
db_do ($dbh_conf, 'UPDATE tconfig SET value=? WHERE token=?', $last_compact, 'last_compact');
|
||||
} else {
|
||||
db_do ($dbh_conf, 'INSERT INTO tconfig (value, token) VALUES (?, ?)', $last_compact, 'last_compact');
|
||||
}
|
||||
printf(strftime("\r" . "%H:%M:%S", localtime()) . ' [COMPACT] ' . "Progress: %.2f%%", 100);
|
||||
print("\n");
|
||||
log_message('COMPACT', "A total of $proceced_total_data elements has been compacted into $compactations elements");
|
||||
|
||||
}
|
||||
|
||||
########################################################################
|
||||
|
@ -1234,9 +1308,8 @@ sub pandoradb_main {
|
|||
# Only active database should be compacted. Disabled for historical database.
|
||||
# Compact on if enable and DaysCompact are below DaysPurge
|
||||
if (($conf->{'_onlypurge'} == 0)
|
||||
&& ($conf->{'_days_compact'} < $conf->{'_days_purge'})
|
||||
) {
|
||||
pandora_compactdb ($conf, defined ($history_dbh) ? $history_dbh : $dbh, $dbh);
|
||||
pandora_compactdb ($conf, $dbh, $dbh);
|
||||
}
|
||||
|
||||
# Update tconfig with last time of database maintance time (now)
|
||||
|
|
Loading…
Reference in New Issue