Merge branch 'ent-7635-11977-procesamiento-de-traps-multi-hilo' into 'develop'

Ent 7635 11977 procesamiento de traps multi hilo

See merge request artica/pandorafms!4187
This commit is contained in:
Daniel Rodriguez 2021-06-23 15:16:47 +00:00
commit 2a48208fc2
5 changed files with 136 additions and 11 deletions

View File

@ -90,6 +90,12 @@ snmpconsole 0
snmpconsole_threads 1 snmpconsole_threads 1
# If set to 1, traps from the same source will never be processed in parallel. 0 by default.
#snmpconsole_lock 0
# Time between consecutive reads of the SNMP log file in seconds. Defaults to server_threshold.
#snmpconsole_threshold 5
# Attempt to translate variable bindings when processing SNMP traps. 1 enabled, 0 disabled. 0 by default. (ENTERPRISE ONLY). # Attempt to translate variable bindings when processing SNMP traps. 1 enabled, 0 disabled. 0 by default. (ENTERPRISE ONLY).
translate_variable_bindings 0 translate_variable_bindings 0

View File

@ -332,6 +332,8 @@ sub pandora_load_config {
$pa_config->{"dynamic_warning"} = 25; # 7.0 $pa_config->{"dynamic_warning"} = 25; # 7.0
$pa_config->{"dynamic_constant"} = 10; # 7.0 $pa_config->{"dynamic_constant"} = 10; # 7.0
$pa_config->{"mssql_driver"} = undef; # 745 $pa_config->{"mssql_driver"} = undef; # 745
$pa_config->{"snmpconsole_lock"} = 0; # 755.
$pa_config->{"snmpconsole_period"} = 0; # 755.
# Internal MTA for alerts, each server need its own config. # Internal MTA for alerts, each server need its own config.
$pa_config->{"mta_address"} = ''; # Introduced on 2.0 $pa_config->{"mta_address"} = ''; # Introduced on 2.0
@ -675,6 +677,12 @@ sub pandora_load_config {
elsif ($parametro =~ m/^snmpconsole_threads\s+(\d+)/i) { elsif ($parametro =~ m/^snmpconsole_threads\s+(\d+)/i) {
$pa_config->{'snmpconsole_threads'}= clean_blank($1); $pa_config->{'snmpconsole_threads'}= clean_blank($1);
} }
elsif ($parametro =~ m/^snmpconsole_lock\s+([0-1])/i) {
$pa_config->{'snmpconsole_lock'}= clean_blank($1);
}
elsif ($parametro =~ m/^snmpconsole_threshold\s+(\d+(?:\.\d+){0,1})/i) {
$pa_config->{'snmpconsole_threshold'}= clean_blank($1);
}
elsif ($parametro =~ m/^translate_variable_bindings\s+([0-1])/i) { elsif ($parametro =~ m/^translate_variable_bindings\s+([0-1])/i) {
$pa_config->{'translate_variable_bindings'}= clean_blank($1); $pa_config->{'translate_variable_bindings'}= clean_blank($1);
} }

View File

@ -23,6 +23,7 @@ use warnings;
use threads; use threads;
use threads::shared; use threads::shared;
use Thread::Semaphore; use Thread::Semaphore;
use Time::HiRes qw(usleep);
# Default lib dir for RPM and DEB packages # Default lib dir for RPM and DEB packages
use lib '/usr/lib/perl5'; use lib '/usr/lib/perl5';
@ -163,7 +164,7 @@ sub data_producer ($$$$$) {
$self->setQueueSize (scalar @{$task_queue}); $self->setQueueSize (scalar @{$task_queue});
threads->yield; threads->yield;
sleep ($pa_config->{'server_threshold'}); usleep (int(1e6 * $self->getPeriod()));
} }
}; };

View File

@ -45,16 +45,20 @@ our @ISA = qw(PandoraFMS::ProducerConsumerServer);
my @TaskQueue :shared; my @TaskQueue :shared;
my %PendingTasks :shared; my %PendingTasks :shared;
my $Sem :shared; my $Sem :shared;
my %Sources :shared;
my $SourceSem :shared;
my $TaskSem :shared; my $TaskSem :shared;
# Trap statistics by agent # Trap statistics by agent
my %AGENTS = (); my %AGENTS = ();
# Sources silenced by storm protection. # Sources silenced by storm protection.
my %SILENCEDSOURCES = (); my %SILENCEDSOURCES = ();
# Index and buffer management for trap log files # Index and buffer management for trap log files
my $SNMPTRAPD = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; my $SNMPTRAPD = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 };
my $DATASERVER = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; my $DATASERVER = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 };
my $BUFFER = { 'log_file' => undef, 'fd' => [], 'idx_file' => undef, 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => undef, 'read_ahead_pos' => 0 };
######################################################################################## ########################################################################################
# SNMP Server class constructor. # SNMP Server class constructor.
@ -96,6 +100,7 @@ sub new ($$$) {
%PendingTasks = (); %PendingTasks = ();
$Sem = Thread::Semaphore->new; $Sem = Thread::Semaphore->new;
$TaskSem = Thread::Semaphore->new (0); $TaskSem = Thread::Semaphore->new (0);
$SourceSem = Thread::Semaphore->new (1);
# Call the constructor of the parent class # Call the constructor of the parent class
my $self = $class->SUPER::new($config, SNMPCONSOLE, \&PandoraFMS::SNMPServer::data_producer, \&PandoraFMS::SNMPServer::data_consumer, $dbh); my $self = $class->SUPER::new($config, SNMPCONSOLE, \&PandoraFMS::SNMPServer::data_producer, \&PandoraFMS::SNMPServer::data_consumer, $dbh);
@ -119,6 +124,11 @@ sub run ($) {
# Set the initial date for storm protection. # Set the initial date for storm protection.
$pa_config->{"__storm_ref__"} = time(); $pa_config->{"__storm_ref__"} = time();
# Set a server-specific period.
if ($pa_config->{'snmpconsole_threshold'} > 0) {
$self->setPeriod($pa_config->{'snmpconsole_threshold'});
}
$self->setNumThreads ($pa_config->{'snmpconsole_threads'}); $self->setNumThreads ($pa_config->{'snmpconsole_threads'});
$self->SUPER::run (\@TaskQueue, \%PendingTasks, $Sem, $TaskSem); $self->SUPER::run (\@TaskQueue, \%PendingTasks, $Sem, $TaskSem);
} }
@ -130,7 +140,9 @@ sub data_producer ($) {
my $self = shift; my $self = shift;
my ($pa_config, $dbh) = ($self->getConfig (), $self->getDBH ()); my ($pa_config, $dbh) = ($self->getConfig (), $self->getDBH ());
my %tasks_by_source;
my @tasks; my @tasks;
my @buffer;
# Reset storm protection counters # Reset storm protection counters
my $curr_time = time (); my $curr_time = time ();
@ -139,7 +151,12 @@ sub data_producer ($) {
%AGENTS = (); %AGENTS = ();
} }
for my $fs (($SNMPTRAPD, $DATASERVER)) { # Make a local copy of locked sources.
$SourceSem->down ();
my $local_sources = {%Sources};
$SourceSem->up ();
for my $fs (($BUFFER, $SNMPTRAPD, $DATASERVER)) {
next unless defined($fs->{'fd'}); next unless defined($fs->{'fd'});
reset_if_truncated($pa_config, $fs); reset_if_truncated($pa_config, $fs);
while (my $line_with_pos = read_snmplogfile($fs)) { while (my $line_with_pos = read_snmplogfile($fs)) {
@ -151,10 +168,11 @@ sub data_producer ($) {
chomp ($line); chomp ($line);
# Update index file # Update index file
open(my $idxfd, '>' . $fs->{'idx_file'}); if (defined($fs->{'idx_file'})) {
print $idxfd $fs->{'last_line'} . ' ' . $fs->{'last_size'}; open(my $idxfd, '>' . $fs->{'idx_file'});
close $idxfd; print $idxfd $fs->{'last_line'} . ' ' . $fs->{'last_size'};
set_file_permissions($pa_config, $fs->{'idx_file'}, "0666"); close $idxfd;
}
# Skip lines other than SNMP Trap logs # Skip lines other than SNMP Trap logs
next unless ($line =~ m/^SNMPv[12]\[\*\*\]/); next unless ($line =~ m/^SNMPv[12]\[\*\*\]/);
@ -189,10 +207,18 @@ sub data_producer ($) {
next; next;
} }
push (@tasks, $line); # Either buffer or process the trap.
if (source_lock($pa_config, $source, $local_sources) == 0) {
push(@buffer, $line);
} else {
push (@tasks, $line);
}
} }
} }
# Save the buffer for the next run.
$BUFFER->{'fd'} = \@buffer;
return @tasks; return @tasks;
} }
@ -201,8 +227,18 @@ sub data_producer ($) {
############################################################################### ###############################################################################
sub data_consumer ($$) { sub data_consumer ($$) {
my ($self, $task) = @_; my ($self, $task) = @_;
my ($pa_config, $server_id, $dbh) = ($self->getConfig(), $self->getServerID(), $self->getDBH());
pandora_snmptrapd ($self->getConfig (), $task, $self->getServerID (), $self->getDBH ()); pandora_snmptrapd ($pa_config, $task, $server_id, $dbh);
# Unlock.
if ($pa_config->{'snmpconsole_lock'} == 1) {
my ($ver, $date, $time, $source, $null) = split(/\[\*\*\]/, $task, 5);
if ($ver eq "SNMPv2" || $pa_config->{'snmp_pdu_address'} eq '1' ) {
$source =~ s/(?:(?:TCP|UDP):\s*)?\[?([^] ]+)\]?(?::-?\d+)?(?:\s*->.*)?$/$1/;
}
source_unlock($pa_config, $source);
}
} }
########################################################################## ##########################################################################
@ -458,6 +494,15 @@ sub read_snmplogfile($) {
my $line; my $line;
my $pos; my $pos;
# Reading from a temporary buffer.
if (ref($fs->{'fd'}) eq 'ARRAY') {
if ($#{$fs->{'fd'}} < 0) {
return undef;
}
return [0, shift(@{$fs->{'fd'}})];
}
if(defined($fs->{'read_ahead_line'})) { if(defined($fs->{'read_ahead_line'})) {
# Restore saved line # Restore saved line
$line = $fs->{'read_ahead_line'}; $line = $fs->{'read_ahead_line'};
@ -537,6 +582,10 @@ sub init_log_file($$$) {
sub reset_if_truncated($$) { sub reset_if_truncated($$) {
my ($pa_config, $fs) = @_; my ($pa_config, $fs) = @_;
if (!defined($fs->{'log_file'})) {
return;
}
my $log_size = (stat ($fs->{'log_file'}))[7]; my $log_size = (stat ($fs->{'log_file'}))[7];
# New SNMP log file found # New SNMP log file found
@ -548,6 +597,45 @@ sub reset_if_truncated($$) {
} }
} }
##########################################################################
# Get a lock on the given source. Return 1 on success, 0 otherwise.
##########################################################################
sub source_lock($$$) {
my ($pa_config, $source, $local_sources) = @_;
# Locking is disabled.
if ($pa_config->{'snmpconsole_lock'} == 0) {
return 1;
}
if (defined($local_sources->{$source})) {
return 0;
}
$local_sources->{$source} = 1;
$SourceSem->down ();
$Sources{$source} = 1;
$SourceSem->up ();
return 1;
}
##########################################################################
# Remove the lock on the given source.
##########################################################################
sub source_unlock {
my ($pa_config, $source) = @_;
# Locking is disabled.
if ($pa_config->{'snmpconsole_lock'} == 0) {
return;
}
$SourceSem->down ();
delete ($Sources{$source});
$SourceSem->up ();
}
############################################################################### ###############################################################################
# Clean-up when the server is destroyed. # Clean-up when the server is destroyed.
############################################################################### ###############################################################################

View File

@ -46,12 +46,16 @@ sub new ($$$;$) {
_num_threads => 1, _num_threads => 1,
_threads => [], _threads => [],
_queue_size => 0, _queue_size => 0,
_errstr => '' _errstr => '',
_period => 0
}; };
# Share variables that may be set from different threads # Share variables that may be set from different threads
share ($self->{'_queue_size'}); share ($self->{'_queue_size'});
share ($self->{'_errstr'}); share ($self->{'_errstr'});
# Set the default period.
$self->{'_period'} = $self->{'_pa_config'}->{'server_threshold'};
bless $self, $class; bless $self, $class;
return $self; return $self;
@ -190,6 +194,24 @@ sub getErrStr ($) {
return $self->{'_errstr'}; return $self->{'_errstr'};
} }
########################################################################################
# Get period.
########################################################################################
sub getPeriod ($) {
my $self = shift;
return $self->{'_period'};
}
########################################################################################
# Set period.
########################################################################################
sub setPeriod ($$) {
my ($self, $period) = @_;
$self->{'_period'} = $period;
}
######################################################################################## ########################################################################################
# Set event storm protection. # Set event storm protection.
######################################################################################## ########################################################################################