From 3dccbc5610988353eaaf03d67f673efe13e38517 Mon Sep 17 00:00:00 2001 From: Ramon Novoa Date: Tue, 22 Apr 2008 13:49:22 +0000 Subject: [PATCH] 2008-04-22 Ramon Novoa * bin/pandora_server: The data server now uses the producer/consumer model, like the rest of the servers. .checksum files are no longer used. git-svn-id: https://svn.code.sf.net/p/pandora/code/trunk@813 c3f86ba8-e40f-0410-aaad-9ba5e7f4b01f --- pandora_server/ChangeLog | 5 + pandora_server/bin/pandora_server | 286 ++++++++++++++++++++---------- 2 files changed, 195 insertions(+), 96 deletions(-) diff --git a/pandora_server/ChangeLog b/pandora_server/ChangeLog index eba14477de..a3d6f073db 100644 --- a/pandora_server/ChangeLog +++ b/pandora_server/ChangeLog @@ -1,3 +1,8 @@ +2008-04-22 Ramon Novoa + + * bin/pandora_server: The data server now uses the producer/consumer + model, like the rest of the servers. .checksum files are no longer + used. 2008-04-17 Sancho Lerena diff --git a/pandora_server/bin/pandora_server b/pandora_server/bin/pandora_server index caafb7fd61..d78f359351 100755 --- a/pandora_server/bin/pandora_server +++ b/pandora_server/bin/pandora_server @@ -35,11 +35,21 @@ use PandoraFMS::Config; use PandoraFMS::Tools; use PandoraFMS::DB; +# Queue management +my @pending_task : shared; +my %pending_task_hash : shared; +my %active_task_hash : shared; +my %incomplete_task_hash : shared; +my $queue_lock : shared; + # FLUSH in each IO, only for DEBUG, very slow ! $| = 0; my %pa_config; +$SIG{'TERM'} = 'pandora_shutdown'; +$SIG{'INT'} = 'pandora_shutdown'; + # Init main loop pandora_init(\%pa_config,"Pandora FMS Data Server"); @@ -57,111 +67,195 @@ if ( $pa_config{"daemon"} eq "1" ){ &pandora_daemonize ( \%pa_config); } +# Launch all data_consumer threads +for (my $ax=0; $ax < $pa_config{'plugin_threads'}; $ax++){ + threads->new( \&pandora_data_consumer, \%pa_config, $ax); +} + +# Launch producer thread +threads->new( \&pandora_data_producer, \%pa_config); + +if ($pa_config{"quiet"} == 0){ + print " [*] All threads loaded and running \n\n"; +} + +# Start logging +pandora_startlog (\%pa_config); + +my $dbhost = $pa_config{'dbhost'}; +my $dbname = $pa_config{'dbname'}; +my $dbh = DBI->connect("DBI:mysql:$dbname:$dbhost:3306", + $pa_config{'dbuser'}, + $pa_config{'dbpass'}, + { RaiseError => 1, AutoCommit => 1 }); + # KeepAlive checks for Agents, only for master servers, in separate thread -threads->new( \&pandora_keepalived, \%pa_config); - -# Module processor subsystem -pandora_dataserver (\%pa_config); - -########################################################################## -# Main loop -########################################################################## - -sub pandora_dataserver { - my $pa_config = $_[0]; - my $file_data; - my $file_md5; - my @file_list; - my $onefile; # Each item of incoming directory - my $agent_filename; - my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 }); - - while ( 1 ) { # Pandora module processor main loop - opendir(DIR, $pa_config->{'incomingdir'} ) or die "[FATAL] Cannot open Incoming data directory at $pa_config->{'incomingdir'}: $!"; - while (defined($onefile = readdir(DIR))){ - if (($onefile =~ /^[a-zA-Z0-9]*/) && ( ((stat($pa_config->{'incomingdir'}."/".$onefile))[7]) > 0 )) { - push @file_list,$onefile; # Push in a stack all directory entries for this loop - } - } - while (defined($onefile = pop @file_list)) { # Begin to process files - threads->yield; - $file_data = "$pa_config->{'incomingdir'}/$onefile"; - #next if $onefile =~ /^\.\.?$/; # Skip . and .. directory - # First filter any file that doesnt like ".data" - if ( $onefile =~ /([\-\:\;\.\,\_\s\a\*\=\(\)a-zA-Z0-9]*).data\z/ ) { - $agent_filename = $1; - $file_md5 = "$pa_config->{'incomingdir'}/$agent_filename.checksum"; - # If check is disabled, ignore if file_md5 exists - if (( -e $file_md5 ) or ($pa_config->{'pandora_check'} == 0)){ - # Verify integrity - my $check_result; - $check_result = md5check ($file_data,$file_md5); - if (($pa_config->{'pandora_check'} == 0) || ($check_result == 1)){ - # PERL cannot "free" memory on user demmand, so - # we are declaring $config hash reference in inner loop - # to force PERL system to realloc memory in each loop. - # In Pandora 1.1 in "standard" PERL Implementations, we could - # have a memory leak problem. This is solved now :-) - # Source : http://www.rocketaware.com/perl/perlfaq3/ - # Procesa_Datos its the main function to process datafile - my $config; # Hash Reference, used to store XML data - # But first we needed to verify integrity of data file - if ($pa_config->{'pandora_check'} == 1){ - logger ($pa_config, "Integrity of Datafile using MD5 is verified: $file_data",3); - } - eval { # XML Processing error catching procedure. Critical due XML was no validated - logger ($pa_config, "Ready to parse $file_data",4); - $config = XMLin($file_data, forcearray=>'module'); - }; - if ($@) { - logger ($pa_config, "[ERROR] Error processing XML contents in $file_data",0); - logger ($pa_config, "[ERROR] $@", 0); - copy ($file_data,$file_data."_BADXML"); - if (($pa_config->{'pandora_check'} == 1) && ( -e $file_md5 )) { - copy ($file_md5,$file_md5."_BADCHECKSUM"); - } - } - procesa_datos ($pa_config, $config, $dbh); - undef $config; - # If _everything_ its ok.. - # delete files - unlink ($file_data); - if ( -e $file_md5 ) { - unlink ($file_md5); - } - } else { # md5 check fails - logger ( $pa_config, "[ERROR] MD5 Checksum failed! for $file_data",0); - # delete files - unlink ($file_data); - if ( -e $file_md5 ) { - unlink ($file_md5); - } - } - } # No checksum file, ignore file - } - } - closedir(DIR); +while (1) { + keep_alive_check (\%pa_config, $dbh); + pandora_serverkeepaliver (\%pa_config, 0, $dbh); threads->yield; - sleep $pa_config->{"server_threshold"}; - } -} # End of main loop function + sleep ($pa_config{"server_threshold"}); +} -########################################################################## -## SUB pandora_keepalived -## Pandora Keepalive alert daemon subsystem -########################################################################## +######################################################################################## +# pandora_shutdown () +# Close system on a received signal +######################################################################################## +sub pandora_shutdown { + logger (\%pa_config,"Pandora FMS Data Server Shutdown by signal ",0); + print " [*] Shutting down Pandora FMS Data Server (received signal)...\n"; + exit; +} -sub pandora_keepalived { +############################################################################### +# pandora_data_producer () +# Queue data files available for processing +############################################################################### +sub pandora_data_producer { my $pa_config = $_[0]; - my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 }); - while ( 1 ){ - sleep $pa_config->{"server_threshold"}; + my $file_name; + my $file; + + # Main loop + while(1) { + + # Read all files in the incoming directory + opendir(DIR, $pa_config->{'incomingdir'} ) + || die "[FATAL] Cannot open Incoming data directory at " . + $pa_config->{'incomingdir'} . ": $!"; + + while (defined($file_name = readdir(DIR))){ + + # For backward compatibility + if ($file_name =~ /^.*\.checksum$/) { + unlink("$pa_config->{'incomingdir'}/$file_name"); + next; + } + + # Data files have the extension .data + if ($file_name !~ /^.*\.data$/) { + next; + } + + # Skip already queued/processed files + if (defined($pending_task_hash{$file_name}) || + defined($active_task_hash{$file_name})) { + next; + } + + # Queue data file + { + lock $queue_lock; + push (@pending_task, $file_name); + $pending_task_hash {$file_name} = 1; + if (! defined($incomplete_task_hash{$file_name})) { + $incomplete_task_hash{$file_name} = 0; + } + } + + threads->yield; + } + + closedir(DIR); threads->yield; - keep_alive_check ($pa_config, $dbh); - pandora_serverkeepaliver ($pa_config, 0, $dbh); # 0 for dataserver + sleep $pa_config->{"server_threshold"}; } } +############################################################################### +# pandora_data_consumer () +# Process data files +############################################################################### +sub pandora_data_consumer ($$) { + my $pa_config = $_[0]; + my $thread_id = $_[1]; + + my $file_name; + my $counter =0; + + if ($pa_config->{"quiet"} == 0){ + print " [*] Starting up Data Consumer Thread # $thread_id \n"; + } + + # Create database handler + my $dbh = DBI->connect("DBI:mysql:" . $pa_config->{'dbname'} . ":" . + $pa_config->{'dbhost'} . ":3306", + $pa_config->{'dbuser'},$pa_config->{'dbpass'}, + { RaiseError => 1, AutoCommit => 1 }); + + LOOP: while (1) { + + if ($counter > 10) { + $counter = 0; + sleep (1); + } + + # Check for pending data files + { + lock $queue_lock; + if (scalar(@pending_task) == 0) { + $counter++; + next LOOP; + } + + $file_name = shift(@pending_task); + delete($pending_task_hash{$file_name}); + $active_task_hash{$file_name} = 1; + } + + my $file = "$pa_config->{'incomingdir'}/$file_name"; + + # Check file really exists to avoid race conditions + if (! -e "$file") { + $counter++; + next LOOP; + } + + my $data; + + # Parse the XML file + eval { + logger ($pa_config, "Ready to parse $file_name",4); + $data = XMLin($file, forcearray=>'module'); + }; + + # Invalid MXL + if ($@) { + + # Retry + if ($incomplete_task_hash{$file_name} < 10) { + { + lock $queue_lock; + delete($active_task_hash{$file_name}); + $incomplete_task_hash{$file_name} += 1; + } + } + # Discard + else { + { + lock $queue_lock; + delete($active_task_hash{$file_name}); + delete($incomplete_task_hash{$file_name}); + rename($file, $file . "_BADXML"); + } + } + + $counter = 0; + next LOOP; + } + + procesa_datos ($pa_config, $data, $dbh); + + { + lock $queue_lock; + delete($active_task_hash{$file_name}); + delete($incomplete_task_hash{$file_name}); + unlink($file); + } + + $counter = 0; + } +} ########################################################################## ## SUB keep_alive_check ()