diff --git a/pandora_server/conf/pandora_server.conf.new b/pandora_server/conf/pandora_server.conf.new index 86da53430e..6a411c5e6d 100644 --- a/pandora_server/conf/pandora_server.conf.new +++ b/pandora_server/conf/pandora_server.conf.new @@ -90,6 +90,12 @@ snmpconsole 0 snmpconsole_threads 1 +# If set to 1, traps from the same source will never be processed in parallel. 0 by default. +#snmpconsole_lock 0 + +# Time between consecutive reads of the SNMP log file in seconds. Defaults to server_threshold. +#snmpconsole_threshold 5 + # Attempt to translate variable bindings when processing SNMP traps. 1 enabled, 0 disabled. 0 by default. (ENTERPRISE ONLY). translate_variable_bindings 0 diff --git a/pandora_server/lib/PandoraFMS/Config.pm b/pandora_server/lib/PandoraFMS/Config.pm index 63a2d65e87..3edae37100 100644 --- a/pandora_server/lib/PandoraFMS/Config.pm +++ b/pandora_server/lib/PandoraFMS/Config.pm @@ -332,6 +332,8 @@ sub pandora_load_config { $pa_config->{"dynamic_warning"} = 25; # 7.0 $pa_config->{"dynamic_constant"} = 10; # 7.0 $pa_config->{"mssql_driver"} = undef; # 745 + $pa_config->{"snmpconsole_lock"} = 0; # 755. + $pa_config->{"snmpconsole_period"} = 0; # 755. # Internal MTA for alerts, each server need its own config. $pa_config->{"mta_address"} = ''; # Introduced on 2.0 @@ -675,6 +677,12 @@ sub pandora_load_config { elsif ($parametro =~ m/^snmpconsole_threads\s+(\d+)/i) { $pa_config->{'snmpconsole_threads'}= clean_blank($1); } + elsif ($parametro =~ m/^snmpconsole_lock\s+([0-1])/i) { + $pa_config->{'snmpconsole_lock'}= clean_blank($1); + } + elsif ($parametro =~ m/^snmpconsole_threshold\s+(\d+(?:\.\d+){0,1})/i) { + $pa_config->{'snmpconsole_threshold'}= clean_blank($1); + } elsif ($parametro =~ m/^translate_variable_bindings\s+([0-1])/i) { $pa_config->{'translate_variable_bindings'}= clean_blank($1); } diff --git a/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm b/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm index 3a9a73d46c..44ea8212fe 100644 --- a/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm +++ b/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm @@ -23,6 +23,7 @@ use warnings; use threads; use threads::shared; use Thread::Semaphore; +use Time::HiRes qw(usleep); # Default lib dir for RPM and DEB packages use lib '/usr/lib/perl5'; @@ -141,7 +142,7 @@ sub data_producer ($$$$$) { $self->setQueueSize (scalar @{$task_queue}); threads->yield; - sleep ($pa_config->{'server_threshold'}); + usleep (int(1e6 * $self->getPeriod())); } }; diff --git a/pandora_server/lib/PandoraFMS/SNMPServer.pm b/pandora_server/lib/PandoraFMS/SNMPServer.pm index 2a0eb7ff08..359c93d233 100644 --- a/pandora_server/lib/PandoraFMS/SNMPServer.pm +++ b/pandora_server/lib/PandoraFMS/SNMPServer.pm @@ -45,16 +45,20 @@ our @ISA = qw(PandoraFMS::ProducerConsumerServer); my @TaskQueue :shared; my %PendingTasks :shared; my $Sem :shared; +my %Sources :shared; +my $SourceSem :shared; my $TaskSem :shared; # Trap statistics by agent my %AGENTS = (); + # Sources silenced by storm protection. my %SILENCEDSOURCES = (); # Index and buffer management for trap log files -my $SNMPTRAPD = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; -my $DATASERVER = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; +my $SNMPTRAPD = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; +my $DATASERVER = { 'log_file' => '', 'fd' => undef, 'idx_file' => '', 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => '', 'read_ahead_pos' => 0 }; +my $BUFFER = { 'log_file' => undef, 'fd' => [], 'idx_file' => undef, 'last_line' => 0, 'last_size' => 0, 'read_ahead_line' => undef, 'read_ahead_pos' => 0 }; ######################################################################################## # SNMP Server class constructor. @@ -96,6 +100,7 @@ sub new ($$$) { %PendingTasks = (); $Sem = Thread::Semaphore->new; $TaskSem = Thread::Semaphore->new (0); + $SourceSem = Thread::Semaphore->new (1); # Call the constructor of the parent class my $self = $class->SUPER::new($config, SNMPCONSOLE, \&PandoraFMS::SNMPServer::data_producer, \&PandoraFMS::SNMPServer::data_consumer, $dbh); @@ -119,6 +124,11 @@ sub run ($) { # Set the initial date for storm protection. $pa_config->{"__storm_ref__"} = time(); + # Set a server-specific period. + if ($pa_config->{'snmpconsole_threshold'} > 0) { + $self->setPeriod($pa_config->{'snmpconsole_threshold'}); + } + $self->setNumThreads ($pa_config->{'snmpconsole_threads'}); $self->SUPER::run (\@TaskQueue, \%PendingTasks, $Sem, $TaskSem); } @@ -130,7 +140,9 @@ sub data_producer ($) { my $self = shift; my ($pa_config, $dbh) = ($self->getConfig (), $self->getDBH ()); + my %tasks_by_source; my @tasks; + my @buffer; # Reset storm protection counters my $curr_time = time (); @@ -139,7 +151,12 @@ sub data_producer ($) { %AGENTS = (); } - for my $fs (($SNMPTRAPD, $DATASERVER)) { + # Make a local copy of locked sources. + $SourceSem->down (); + my $local_sources = {%Sources}; + $SourceSem->up (); + + for my $fs (($BUFFER, $SNMPTRAPD, $DATASERVER)) { next unless defined($fs->{'fd'}); reset_if_truncated($pa_config, $fs); while (my $line_with_pos = read_snmplogfile($fs)) { @@ -151,10 +168,11 @@ sub data_producer ($) { chomp ($line); # Update index file - open(my $idxfd, '>' . $fs->{'idx_file'}); - print $idxfd $fs->{'last_line'} . ' ' . $fs->{'last_size'}; - close $idxfd; - set_file_permissions($pa_config, $fs->{'idx_file'}, "0666"); + if (defined($fs->{'idx_file'})) { + open(my $idxfd, '>' . $fs->{'idx_file'}); + print $idxfd $fs->{'last_line'} . ' ' . $fs->{'last_size'}; + close $idxfd; + } # Skip lines other than SNMP Trap logs next unless ($line =~ m/^SNMPv[12]\[\*\*\]/); @@ -189,10 +207,18 @@ sub data_producer ($) { next; } - push (@tasks, $line); + # Either buffer or process the trap. + if (source_lock($pa_config, $source, $local_sources) == 0) { + push(@buffer, $line); + } else { + push (@tasks, $line); + } } } + # Save the buffer for the next run. + $BUFFER->{'fd'} = \@buffer; + return @tasks; } @@ -201,8 +227,18 @@ sub data_producer ($) { ############################################################################### sub data_consumer ($$) { my ($self, $task) = @_; + my ($pa_config, $server_id, $dbh) = ($self->getConfig(), $self->getServerID(), $self->getDBH()); - pandora_snmptrapd ($self->getConfig (), $task, $self->getServerID (), $self->getDBH ()); + pandora_snmptrapd ($pa_config, $task, $server_id, $dbh); + + # Unlock. + if ($pa_config->{'snmpconsole_lock'} == 1) { + my ($ver, $date, $time, $source, $null) = split(/\[\*\*\]/, $task, 5); + if ($ver eq "SNMPv2" || $pa_config->{'snmp_pdu_address'} eq '1' ) { + $source =~ s/(?:(?:TCP|UDP):\s*)?\[?([^] ]+)\]?(?::-?\d+)?(?:\s*->.*)?$/$1/; + } + source_unlock($pa_config, $source); + } } ########################################################################## @@ -458,6 +494,15 @@ sub read_snmplogfile($) { my $line; my $pos; + # Reading from a temporary buffer. + if (ref($fs->{'fd'}) eq 'ARRAY') { + if ($#{$fs->{'fd'}} < 0) { + return undef; + } + + return [0, shift(@{$fs->{'fd'}})]; + } + if(defined($fs->{'read_ahead_line'})) { # Restore saved line $line = $fs->{'read_ahead_line'}; @@ -537,6 +582,10 @@ sub init_log_file($$$) { sub reset_if_truncated($$) { my ($pa_config, $fs) = @_; + if (!defined($fs->{'log_file'})) { + return; + } + my $log_size = (stat ($fs->{'log_file'}))[7]; # New SNMP log file found @@ -548,6 +597,45 @@ sub reset_if_truncated($$) { } } +########################################################################## +# Get a lock on the given source. Return 1 on success, 0 otherwise. +########################################################################## +sub source_lock($$$) { + my ($pa_config, $source, $local_sources) = @_; + + # Locking is disabled. + if ($pa_config->{'snmpconsole_lock'} == 0) { + return 1; + } + + if (defined($local_sources->{$source})) { + return 0; + } + + $local_sources->{$source} = 1; + $SourceSem->down (); + $Sources{$source} = 1; + $SourceSem->up (); + + return 1; +} + +########################################################################## +# Remove the lock on the given source. +########################################################################## +sub source_unlock { + my ($pa_config, $source) = @_; + + # Locking is disabled. + if ($pa_config->{'snmpconsole_lock'} == 0) { + return; + } + + $SourceSem->down (); + delete ($Sources{$source}); + $SourceSem->up (); +} + ############################################################################### # Clean-up when the server is destroyed. ############################################################################### diff --git a/pandora_server/lib/PandoraFMS/Server.pm b/pandora_server/lib/PandoraFMS/Server.pm index d3587556d8..6312c2be41 100644 --- a/pandora_server/lib/PandoraFMS/Server.pm +++ b/pandora_server/lib/PandoraFMS/Server.pm @@ -46,12 +46,16 @@ sub new ($$$;$) { _num_threads => 1, _threads => [], _queue_size => 0, - _errstr => '' + _errstr => '', + _period => 0 }; # Share variables that may be set from different threads share ($self->{'_queue_size'}); share ($self->{'_errstr'}); + + # Set the default period. + $self->{'_period'} = $self->{'_pa_config'}->{'server_threshold'}; bless $self, $class; return $self; @@ -185,6 +189,24 @@ sub getErrStr ($) { return $self->{'_errstr'}; } +######################################################################################## +# Get period. +######################################################################################## +sub getPeriod ($) { + my $self = shift; + + return $self->{'_period'}; +} + +######################################################################################## +# Set period. +######################################################################################## +sub setPeriod ($$) { + my ($self, $period) = @_; + + $self->{'_period'} = $period; +} + ######################################################################################## # Set event storm protection. ########################################################################################