2008-04-22 Ramon Novoa <rnovoa@artica.es>
* bin/pandora_server: The data server now uses the producer/consumer model, like the rest of the servers. .checksum files are no longer used. git-svn-id: https://svn.code.sf.net/p/pandora/code/trunk@813 c3f86ba8-e40f-0410-aaad-9ba5e7f4b01f
This commit is contained in:
parent
c1b2991cd1
commit
ea0fdae0f9
|
@ -1,3 +1,8 @@
|
|||
2008-04-22 Ramon Novoa <rnovoa@artica.es>
|
||||
|
||||
* bin/pandora_server: The data server now uses the producer/consumer
|
||||
model, like the rest of the servers. .checksum files are no longer
|
||||
used.
|
||||
|
||||
2008-04-17 Sancho Lerena <slerena@artica.es>
|
||||
|
||||
|
|
|
@ -35,11 +35,21 @@ use PandoraFMS::Config;
|
|||
use PandoraFMS::Tools;
|
||||
use PandoraFMS::DB;
|
||||
|
||||
# Queue management
|
||||
my @pending_task : shared;
|
||||
my %pending_task_hash : shared;
|
||||
my %active_task_hash : shared;
|
||||
my %incomplete_task_hash : shared;
|
||||
my $queue_lock : shared;
|
||||
|
||||
# FLUSH in each IO, only for DEBUG, very slow !
|
||||
$| = 0;
|
||||
|
||||
my %pa_config;
|
||||
|
||||
$SIG{'TERM'} = 'pandora_shutdown';
|
||||
$SIG{'INT'} = 'pandora_shutdown';
|
||||
|
||||
# Init main loop
|
||||
pandora_init(\%pa_config,"Pandora FMS Data Server");
|
||||
|
||||
|
@ -57,111 +67,195 @@ if ( $pa_config{"daemon"} eq "1" ){
|
|||
&pandora_daemonize ( \%pa_config);
|
||||
}
|
||||
|
||||
# Launch all data_consumer threads
|
||||
for (my $ax=0; $ax < $pa_config{'plugin_threads'}; $ax++){
|
||||
threads->new( \&pandora_data_consumer, \%pa_config, $ax);
|
||||
}
|
||||
|
||||
# Launch producer thread
|
||||
threads->new( \&pandora_data_producer, \%pa_config);
|
||||
|
||||
if ($pa_config{"quiet"} == 0){
|
||||
print " [*] All threads loaded and running \n\n";
|
||||
}
|
||||
|
||||
# Start logging
|
||||
pandora_startlog (\%pa_config);
|
||||
|
||||
my $dbhost = $pa_config{'dbhost'};
|
||||
my $dbname = $pa_config{'dbname'};
|
||||
my $dbh = DBI->connect("DBI:mysql:$dbname:$dbhost:3306",
|
||||
$pa_config{'dbuser'},
|
||||
$pa_config{'dbpass'},
|
||||
{ RaiseError => 1, AutoCommit => 1 });
|
||||
|
||||
# KeepAlive checks for Agents, only for master servers, in separate thread
|
||||
threads->new( \&pandora_keepalived, \%pa_config);
|
||||
|
||||
# Module processor subsystem
|
||||
pandora_dataserver (\%pa_config);
|
||||
|
||||
##########################################################################
|
||||
# Main loop
|
||||
##########################################################################
|
||||
|
||||
sub pandora_dataserver {
|
||||
my $pa_config = $_[0];
|
||||
my $file_data;
|
||||
my $file_md5;
|
||||
my @file_list;
|
||||
my $onefile; # Each item of incoming directory
|
||||
my $agent_filename;
|
||||
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 });
|
||||
|
||||
while ( 1 ) { # Pandora module processor main loop
|
||||
opendir(DIR, $pa_config->{'incomingdir'} ) or die "[FATAL] Cannot open Incoming data directory at $pa_config->{'incomingdir'}: $!";
|
||||
while (defined($onefile = readdir(DIR))){
|
||||
if (($onefile =~ /^[a-zA-Z0-9]*/) && ( ((stat($pa_config->{'incomingdir'}."/".$onefile))[7]) > 0 )) {
|
||||
push @file_list,$onefile; # Push in a stack all directory entries for this loop
|
||||
}
|
||||
}
|
||||
while (defined($onefile = pop @file_list)) { # Begin to process files
|
||||
threads->yield;
|
||||
$file_data = "$pa_config->{'incomingdir'}/$onefile";
|
||||
#next if $onefile =~ /^\.\.?$/; # Skip . and .. directory
|
||||
# First filter any file that doesnt like ".data"
|
||||
if ( $onefile =~ /([\-\:\;\.\,\_\s\a\*\=\(\)a-zA-Z0-9]*).data\z/ ) {
|
||||
$agent_filename = $1;
|
||||
$file_md5 = "$pa_config->{'incomingdir'}/$agent_filename.checksum";
|
||||
# If check is disabled, ignore if file_md5 exists
|
||||
if (( -e $file_md5 ) or ($pa_config->{'pandora_check'} == 0)){
|
||||
# Verify integrity
|
||||
my $check_result;
|
||||
$check_result = md5check ($file_data,$file_md5);
|
||||
if (($pa_config->{'pandora_check'} == 0) || ($check_result == 1)){
|
||||
# PERL cannot "free" memory on user demmand, so
|
||||
# we are declaring $config hash reference in inner loop
|
||||
# to force PERL system to realloc memory in each loop.
|
||||
# In Pandora 1.1 in "standard" PERL Implementations, we could
|
||||
# have a memory leak problem. This is solved now :-)
|
||||
# Source : http://www.rocketaware.com/perl/perlfaq3/
|
||||
# Procesa_Datos its the main function to process datafile
|
||||
my $config; # Hash Reference, used to store XML data
|
||||
# But first we needed to verify integrity of data file
|
||||
if ($pa_config->{'pandora_check'} == 1){
|
||||
logger ($pa_config, "Integrity of Datafile using MD5 is verified: $file_data",3);
|
||||
}
|
||||
eval { # XML Processing error catching procedure. Critical due XML was no validated
|
||||
logger ($pa_config, "Ready to parse $file_data",4);
|
||||
$config = XMLin($file_data, forcearray=>'module');
|
||||
};
|
||||
if ($@) {
|
||||
logger ($pa_config, "[ERROR] Error processing XML contents in $file_data",0);
|
||||
logger ($pa_config, "[ERROR] $@", 0);
|
||||
copy ($file_data,$file_data."_BADXML");
|
||||
if (($pa_config->{'pandora_check'} == 1) && ( -e $file_md5 )) {
|
||||
copy ($file_md5,$file_md5."_BADCHECKSUM");
|
||||
}
|
||||
}
|
||||
procesa_datos ($pa_config, $config, $dbh);
|
||||
undef $config;
|
||||
# If _everything_ its ok..
|
||||
# delete files
|
||||
unlink ($file_data);
|
||||
if ( -e $file_md5 ) {
|
||||
unlink ($file_md5);
|
||||
}
|
||||
} else { # md5 check fails
|
||||
logger ( $pa_config, "[ERROR] MD5 Checksum failed! for $file_data",0);
|
||||
# delete files
|
||||
unlink ($file_data);
|
||||
if ( -e $file_md5 ) {
|
||||
unlink ($file_md5);
|
||||
}
|
||||
}
|
||||
} # No checksum file, ignore file
|
||||
}
|
||||
}
|
||||
closedir(DIR);
|
||||
while (1) {
|
||||
keep_alive_check (\%pa_config, $dbh);
|
||||
pandora_serverkeepaliver (\%pa_config, 0, $dbh);
|
||||
threads->yield;
|
||||
sleep $pa_config->{"server_threshold"};
|
||||
}
|
||||
} # End of main loop function
|
||||
sleep ($pa_config{"server_threshold"});
|
||||
}
|
||||
|
||||
##########################################################################
|
||||
## SUB pandora_keepalived
|
||||
## Pandora Keepalive alert daemon subsystem
|
||||
##########################################################################
|
||||
########################################################################################
|
||||
# pandora_shutdown ()
|
||||
# Close system on a received signal
|
||||
########################################################################################
|
||||
sub pandora_shutdown {
|
||||
logger (\%pa_config,"Pandora FMS Data Server Shutdown by signal ",0);
|
||||
print " [*] Shutting down Pandora FMS Data Server (received signal)...\n";
|
||||
exit;
|
||||
}
|
||||
|
||||
sub pandora_keepalived {
|
||||
###############################################################################
|
||||
# pandora_data_producer ()
|
||||
# Queue data files available for processing
|
||||
###############################################################################
|
||||
sub pandora_data_producer {
|
||||
my $pa_config = $_[0];
|
||||
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306",$pa_config->{"dbuser"}, $pa_config->{"dbpass"},{ RaiseError => 1, AutoCommit => 1 });
|
||||
while ( 1 ){
|
||||
sleep $pa_config->{"server_threshold"};
|
||||
my $file_name;
|
||||
my $file;
|
||||
|
||||
# Main loop
|
||||
while(1) {
|
||||
|
||||
# Read all files in the incoming directory
|
||||
opendir(DIR, $pa_config->{'incomingdir'} )
|
||||
|| die "[FATAL] Cannot open Incoming data directory at " .
|
||||
$pa_config->{'incomingdir'} . ": $!";
|
||||
|
||||
while (defined($file_name = readdir(DIR))){
|
||||
|
||||
# For backward compatibility
|
||||
if ($file_name =~ /^.*\.checksum$/) {
|
||||
unlink("$pa_config->{'incomingdir'}/$file_name");
|
||||
next;
|
||||
}
|
||||
|
||||
# Data files have the extension .data
|
||||
if ($file_name !~ /^.*\.data$/) {
|
||||
next;
|
||||
}
|
||||
|
||||
# Skip already queued/processed files
|
||||
if (defined($pending_task_hash{$file_name}) ||
|
||||
defined($active_task_hash{$file_name})) {
|
||||
next;
|
||||
}
|
||||
|
||||
# Queue data file
|
||||
{
|
||||
lock $queue_lock;
|
||||
push (@pending_task, $file_name);
|
||||
$pending_task_hash {$file_name} = 1;
|
||||
if (! defined($incomplete_task_hash{$file_name})) {
|
||||
$incomplete_task_hash{$file_name} = 0;
|
||||
}
|
||||
}
|
||||
|
||||
threads->yield;
|
||||
}
|
||||
|
||||
closedir(DIR);
|
||||
threads->yield;
|
||||
keep_alive_check ($pa_config, $dbh);
|
||||
pandora_serverkeepaliver ($pa_config, 0, $dbh); # 0 for dataserver
|
||||
sleep $pa_config->{"server_threshold"};
|
||||
}
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
# pandora_data_consumer ()
|
||||
# Process data files
|
||||
###############################################################################
|
||||
sub pandora_data_consumer ($$) {
|
||||
my $pa_config = $_[0];
|
||||
my $thread_id = $_[1];
|
||||
|
||||
my $file_name;
|
||||
my $counter =0;
|
||||
|
||||
if ($pa_config->{"quiet"} == 0){
|
||||
print " [*] Starting up Data Consumer Thread # $thread_id \n";
|
||||
}
|
||||
|
||||
# Create database handler
|
||||
my $dbh = DBI->connect("DBI:mysql:" . $pa_config->{'dbname'} . ":" .
|
||||
$pa_config->{'dbhost'} . ":3306",
|
||||
$pa_config->{'dbuser'},$pa_config->{'dbpass'},
|
||||
{ RaiseError => 1, AutoCommit => 1 });
|
||||
|
||||
LOOP: while (1) {
|
||||
|
||||
if ($counter > 10) {
|
||||
$counter = 0;
|
||||
sleep (1);
|
||||
}
|
||||
|
||||
# Check for pending data files
|
||||
{
|
||||
lock $queue_lock;
|
||||
if (scalar(@pending_task) == 0) {
|
||||
$counter++;
|
||||
next LOOP;
|
||||
}
|
||||
|
||||
$file_name = shift(@pending_task);
|
||||
delete($pending_task_hash{$file_name});
|
||||
$active_task_hash{$file_name} = 1;
|
||||
}
|
||||
|
||||
my $file = "$pa_config->{'incomingdir'}/$file_name";
|
||||
|
||||
# Check file really exists to avoid race conditions
|
||||
if (! -e "$file") {
|
||||
$counter++;
|
||||
next LOOP;
|
||||
}
|
||||
|
||||
my $data;
|
||||
|
||||
# Parse the XML file
|
||||
eval {
|
||||
logger ($pa_config, "Ready to parse $file_name",4);
|
||||
$data = XMLin($file, forcearray=>'module');
|
||||
};
|
||||
|
||||
# Invalid MXL
|
||||
if ($@) {
|
||||
|
||||
# Retry
|
||||
if ($incomplete_task_hash{$file_name} < 10) {
|
||||
{
|
||||
lock $queue_lock;
|
||||
delete($active_task_hash{$file_name});
|
||||
$incomplete_task_hash{$file_name} += 1;
|
||||
}
|
||||
}
|
||||
# Discard
|
||||
else {
|
||||
{
|
||||
lock $queue_lock;
|
||||
delete($active_task_hash{$file_name});
|
||||
delete($incomplete_task_hash{$file_name});
|
||||
rename($file, $file . "_BADXML");
|
||||
}
|
||||
}
|
||||
|
||||
$counter = 0;
|
||||
next LOOP;
|
||||
}
|
||||
|
||||
procesa_datos ($pa_config, $data, $dbh);
|
||||
|
||||
{
|
||||
lock $queue_lock;
|
||||
delete($active_task_hash{$file_name});
|
||||
delete($incomplete_task_hash{$file_name});
|
||||
unlink($file);
|
||||
}
|
||||
|
||||
$counter = 0;
|
||||
}
|
||||
}
|
||||
|
||||
##########################################################################
|
||||
## SUB keep_alive_check ()
|
||||
|
|
Loading…
Reference in New Issue