2008-04-22 Ramon Novoa <rnovoa@artica.es>

* bin/pandora_server: The data server now uses the producer/consumer
          model, like the rest of the servers. .checksum files are no longer
          used.




git-svn-id: https://svn.code.sf.net/p/pandora/code/trunk@813 c3f86ba8-e40f-0410-aaad-9ba5e7f4b01f
This commit is contained in:
Ramon Novoa 2008-04-22 13:49:22 +00:00
parent 0d1fbb542f
commit 3dccbc5610
2 changed files with 195 additions and 96 deletions

View File

@ -1,3 +1,8 @@
2008-04-22 Ramon Novoa <rnovoa@artica.es>
* bin/pandora_server: The data server now uses the producer/consumer
model, like the rest of the servers. .checksum files are no longer
used.
2008-04-17 Sancho Lerena <slerena@artica.es>

View File

@ -35,11 +35,21 @@ use PandoraFMS::Config;
use PandoraFMS::Tools;
use PandoraFMS::DB;
# Queue management
my @pending_task : shared;
my %pending_task_hash : shared;
my %active_task_hash : shared;
my %incomplete_task_hash : shared;
my $queue_lock : shared;
# FLUSH in each IO, only for DEBUG, very slow !
$| = 0;
my %pa_config;
$SIG{'TERM'} = 'pandora_shutdown';
$SIG{'INT'} = 'pandora_shutdown';
# Init main loop
pandora_init(\%pa_config,"Pandora FMS Data Server");
@ -57,111 +67,195 @@ if ( $pa_config{"daemon"} eq "1" ){
&pandora_daemonize ( \%pa_config);
}
# Launch all data_consumer threads
for (my $ax=0; $ax < $pa_config{'plugin_threads'}; $ax++){
threads->new( \&pandora_data_consumer, \%pa_config, $ax);
}
# Launch producer thread
threads->new( \&pandora_data_producer, \%pa_config);
if ($pa_config{"quiet"} == 0){
print " [*] All threads loaded and running \n\n";
}
# Start logging
pandora_startlog (\%pa_config);
my $dbhost = $pa_config{'dbhost'};
my $dbname = $pa_config{'dbname'};
my $dbh = DBI->connect("DBI:mysql:$dbname:$dbhost:3306",
$pa_config{'dbuser'},
$pa_config{'dbpass'},
{ RaiseError => 1, AutoCommit => 1 });
# KeepAlive checks for Agents, only for master servers, in separate thread
threads->new( \&pandora_keepalived, \%pa_config);
# Module processor subsystem
pandora_dataserver (\%pa_config);
##########################################################################
# Main loop
##########################################################################
sub pandora_dataserver {
my $pa_config = $_[0];
my $file_data;
my $file_md5;
my @file_list;
my $onefile; # Each item of incoming directory
my $agent_filename;
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 });
while ( 1 ) { # Pandora module processor main loop
opendir(DIR, $pa_config->{'incomingdir'} ) or die "[FATAL] Cannot open Incoming data directory at $pa_config->{'incomingdir'}: $!";
while (defined($onefile = readdir(DIR))){
if (($onefile =~ /^[a-zA-Z0-9]*/) && ( ((stat($pa_config->{'incomingdir'}."/".$onefile))[7]) > 0 )) {
push @file_list,$onefile; # Push in a stack all directory entries for this loop
}
}
while (defined($onefile = pop @file_list)) { # Begin to process files
threads->yield;
$file_data = "$pa_config->{'incomingdir'}/$onefile";
#next if $onefile =~ /^\.\.?$/; # Skip . and .. directory
# First filter any file that doesnt like ".data"
if ( $onefile =~ /([\-\:\;\.\,\_\s\a\*\=\(\)a-zA-Z0-9]*).data\z/ ) {
$agent_filename = $1;
$file_md5 = "$pa_config->{'incomingdir'}/$agent_filename.checksum";
# If check is disabled, ignore if file_md5 exists
if (( -e $file_md5 ) or ($pa_config->{'pandora_check'} == 0)){
# Verify integrity
my $check_result;
$check_result = md5check ($file_data,$file_md5);
if (($pa_config->{'pandora_check'} == 0) || ($check_result == 1)){
# PERL cannot "free" memory on user demmand, so
# we are declaring $config hash reference in inner loop
# to force PERL system to realloc memory in each loop.
# In Pandora 1.1 in "standard" PERL Implementations, we could
# have a memory leak problem. This is solved now :-)
# Source : http://www.rocketaware.com/perl/perlfaq3/
# Procesa_Datos its the main function to process datafile
my $config; # Hash Reference, used to store XML data
# But first we needed to verify integrity of data file
if ($pa_config->{'pandora_check'} == 1){
logger ($pa_config, "Integrity of Datafile using MD5 is verified: $file_data",3);
}
eval { # XML Processing error catching procedure. Critical due XML was no validated
logger ($pa_config, "Ready to parse $file_data",4);
$config = XMLin($file_data, forcearray=>'module');
};
if ($@) {
logger ($pa_config, "[ERROR] Error processing XML contents in $file_data",0);
logger ($pa_config, "[ERROR] $@", 0);
copy ($file_data,$file_data."_BADXML");
if (($pa_config->{'pandora_check'} == 1) && ( -e $file_md5 )) {
copy ($file_md5,$file_md5."_BADCHECKSUM");
}
}
procesa_datos ($pa_config, $config, $dbh);
undef $config;
# If _everything_ its ok..
# delete files
unlink ($file_data);
if ( -e $file_md5 ) {
unlink ($file_md5);
}
} else { # md5 check fails
logger ( $pa_config, "[ERROR] MD5 Checksum failed! for $file_data",0);
# delete files
unlink ($file_data);
if ( -e $file_md5 ) {
unlink ($file_md5);
}
}
} # No checksum file, ignore file
}
}
closedir(DIR);
while (1) {
keep_alive_check (\%pa_config, $dbh);
pandora_serverkeepaliver (\%pa_config, 0, $dbh);
threads->yield;
sleep $pa_config->{"server_threshold"};
}
} # End of main loop function
sleep ($pa_config{"server_threshold"});
}
##########################################################################
## SUB pandora_keepalived
## Pandora Keepalive alert daemon subsystem
##########################################################################
########################################################################################
# pandora_shutdown ()
# Close system on a received signal
########################################################################################
sub pandora_shutdown {
logger (\%pa_config,"Pandora FMS Data Server Shutdown by signal ",0);
print " [*] Shutting down Pandora FMS Data Server (received signal)...\n";
exit;
}
sub pandora_keepalived {
###############################################################################
# pandora_data_producer ()
# Queue data files available for processing
###############################################################################
sub pandora_data_producer {
my $pa_config = $_[0];
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 });
while ( 1 ){
sleep $pa_config->{"server_threshold"};
my $file_name;
my $file;
# Main loop
while(1) {
# Read all files in the incoming directory
opendir(DIR, $pa_config->{'incomingdir'} )
|| die "[FATAL] Cannot open Incoming data directory at " .
$pa_config->{'incomingdir'} . ": $!";
while (defined($file_name = readdir(DIR))){
# For backward compatibility
if ($file_name =~ /^.*\.checksum$/) {
unlink("$pa_config->{'incomingdir'}/$file_name");
next;
}
# Data files have the extension .data
if ($file_name !~ /^.*\.data$/) {
next;
}
# Skip already queued/processed files
if (defined($pending_task_hash{$file_name}) ||
defined($active_task_hash{$file_name})) {
next;
}
# Queue data file
{
lock $queue_lock;
push (@pending_task, $file_name);
$pending_task_hash {$file_name} = 1;
if (! defined($incomplete_task_hash{$file_name})) {
$incomplete_task_hash{$file_name} = 0;
}
}
threads->yield;
}
closedir(DIR);
threads->yield;
keep_alive_check ($pa_config, $dbh);
pandora_serverkeepaliver ($pa_config, 0, $dbh); # 0 for dataserver
sleep $pa_config->{"server_threshold"};
}
}
###############################################################################
# pandora_data_consumer ()
# Process data files
###############################################################################
sub pandora_data_consumer ($$) {
my $pa_config = $_[0];
my $thread_id = $_[1];
my $file_name;
my $counter =0;
if ($pa_config->{"quiet"} == 0){
print " [*] Starting up Data Consumer Thread # $thread_id \n";
}
# Create database handler
my $dbh = DBI->connect("DBI:mysql:" . $pa_config->{'dbname'} . ":" .
$pa_config->{'dbhost'} . ":3306",
$pa_config->{'dbuser'},$pa_config->{'dbpass'},
{ RaiseError => 1, AutoCommit => 1 });
LOOP: while (1) {
if ($counter > 10) {
$counter = 0;
sleep (1);
}
# Check for pending data files
{
lock $queue_lock;
if (scalar(@pending_task) == 0) {
$counter++;
next LOOP;
}
$file_name = shift(@pending_task);
delete($pending_task_hash{$file_name});
$active_task_hash{$file_name} = 1;
}
my $file = "$pa_config->{'incomingdir'}/$file_name";
# Check file really exists to avoid race conditions
if (! -e "$file") {
$counter++;
next LOOP;
}
my $data;
# Parse the XML file
eval {
logger ($pa_config, "Ready to parse $file_name",4);
$data = XMLin($file, forcearray=>'module');
};
# Invalid MXL
if ($@) {
# Retry
if ($incomplete_task_hash{$file_name} < 10) {
{
lock $queue_lock;
delete($active_task_hash{$file_name});
$incomplete_task_hash{$file_name} += 1;
}
}
# Discard
else {
{
lock $queue_lock;
delete($active_task_hash{$file_name});
delete($incomplete_task_hash{$file_name});
rename($file, $file . "_BADXML");
}
}
$counter = 0;
next LOOP;
}
procesa_datos ($pa_config, $data, $dbh);
{
lock $queue_lock;
delete($active_task_hash{$file_name});
delete($incomplete_task_hash{$file_name});
unlink($file);
}
$counter = 0;
}
}
##########################################################################
## SUB keep_alive_check ()