- Fix race condition with threads::queue

git-svn-id: http://svn.merethis.net/centreon-esxd/trunk@9 a5eaa968-4c79-4d68-970d-af6011b5b055
This commit is contained in:
Quentin Garnier 2012-09-13 11:39:02 +00:00
parent 8f7c4cbea4
commit e72a7a42a1
1 changed files with 230 additions and 180 deletions

View File

@ -9,19 +9,15 @@ use VMware::VIRuntime;
use VMware::VILib;
use IO::Socket;
use Net::hostent; # for OOish version of gethostbyaddr
use threads;
use Thread::Queue;
use IO::Select;
use POSIX ":sys_wait_h";
use Data::Dumper;
use Time::HiRes;
use vars qw($port $service_url $username $password $TIMEOUT_VSPHERE $TIMEOUT $TIMEOUT_KILL $REFRESH_KEEPER_SESSION $LOG);
require '/etc/centreon/centreon_esxd.pm';
our $session_id;
our $data_queue;
our $response_queue;
our %sockets = ();
our %child_proc;
our %return_child;
@ -36,6 +32,15 @@ our $perfcounter_refreshrate = 20;
our $perfcounter_speriod = -1;
our $stop = 0;
our $counter_request_id = 0;
our $child_vpshere_pid;
our $read_select;
our $reader_pipe_one;
our $writer_pipe_one;
our $reader_pipe_two;
our $writer_pipe_two;
our $session1;
our $counter = 0;
our $global_id;
our %ERRORS = ( "OK" => 0, "WARNING" => 1, "CRITICAL" => 2, "UNKNOWN" => 3, "PENDING" => 4);
our %MYERRORS = (0 => "OK", 1 => "WARNING", 3 => "CRITICAL", 7 => "UNKNOWN");
@ -66,7 +71,10 @@ sub connect_vsphere {
eval {
$SIG{ALRM} = sub { die('TIMEOUT'); };
alarm($TIMEOUT_VSPHERE);
Vim::login(service_url=> $service_url, user_name => $username, password => $password);
$session1 = Vim->new(service_url => $service_url);
$session1->login(
user_name => $username,
password => $password);
alarm(0);
};
if($@) {
@ -75,16 +83,20 @@ sub connect_vsphere {
writeLogFile("Login to VirtualCentre server failed: $@");
return 1;
}
eval {
$session_id = Vim::get_session_id();
};
if($@) {
writeLogFile("Can't get session_id: $@\n");
return 1;
}
# eval {
# $session_id = Vim::get_session_id();
# };
# if($@) {
# writeLogFile("Can't get session_id: $@\n");
# return 1;
# }
return 0;
}
sub print_response {
print "$global_id|" . $_[0];
}
sub output_add($$$$) {
my ($output_str, $output_append, $delim, $str) = (shift, shift, shift, shift);
$$output_str .= $$output_append . $str;
@ -107,13 +119,13 @@ sub get_views {
my $results;
eval {
$results = Vim::get_views(mo_ref_array => $_[0], properties => $_[1]);
$results = $session1->get_views(mo_ref_array => $_[0], properties => $_[1]);
};
if ($@) {
writeLogFile("$@");
my $lerror = $@;
$lerror =~ s/\n/ /g;
print "-1|Error: " . $lerror . "\n";
print_response("-1|Error: " . $lerror . "\n");
return undef;
}
return $results;
@ -169,7 +181,7 @@ sub generic_performance_values_historic {
sub cache_perf_counters {
eval {
$perfmanager_view = Vim::get_view(mo_ref => Vim::get_service_content()->perfManager, properties => ['perfCounter', 'historicalInterval']);
$perfmanager_view = $session1->get_view(mo_ref => $session1->get_service_content()->perfManager, properties => ['perfCounter', 'historicalInterval']);
foreach (@{$perfmanager_view->perfCounter}) {
my $label = $_->groupInfo->key . "." . $_->nameInfo->key . "." . $_->rollupType->val;
$perfcounter_cache{$label} = {'key' => $_->key, 'unitkey' => $_->unitInfo->key};
@ -196,29 +208,29 @@ sub get_entities_host {
my $entity_views;
eval {
$entity_views = Vim::find_entity_views(view_type => $view_type, properties => $properties, filter => $filters);
$entity_views = $session1->find_entity_views(view_type => $view_type, properties => $properties, filter => $filters);
};
if ($@ =~ /decryption failed or bad record mac/) {
writeLogFile("$@");
eval {
$entity_views = Vim::find_entity_views(view_type => $view_type, properties => $properties, filter => $filters);
$entity_views = $session1->find_entity_views(view_type => $view_type, properties => $properties, filter => $filters);
};
if ($@) {
my $lerror = $@;
$lerror =~ s/\n/ /g;
print "-1|Error: " . $lerror . "\n";
print_response("-1|Error: " . Data::Dumper::Dumper($lerror) . "\n");
return undef;
}
} elsif ($@) {
writeLogFile("$@");
my $lerror = $@;
$lerror =~ s/\n/ /g;
print "-1|Error: " . $lerror . "\n";
print_response("-1|Error: " . $lerror . "\n");
return undef;
}
if (!@$entity_views) {
my $status |= $MYERRORS_MASK{'UNKNOWN'};
print $ERRORS{$MYERRORS{$status}} . "|Host does not exist.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|Host does not exist.\n");
return undef;
}
#eval {
@ -331,7 +343,7 @@ sub healthhost_do {
$output = "All $OKCount health checks are green";
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -419,7 +431,7 @@ sub datastores_do {
$output = "Datastore '$ds' not found or summary not accessible.";
$status |= $MYERRORS_MASK{'UNKNOWN'};
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -461,7 +473,7 @@ sub maintenancehost_do {
}
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -521,7 +533,7 @@ sub statushost_do {
}
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -560,7 +572,7 @@ sub cpuhost_do {
my ($lhost, $warn, $crit) = @_;
if (!($perfcounter_speriod > 0)) {
my $status |= $MYERRORS_MASK{'UNKNOWN'};
print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n");
return ;
}
@ -604,7 +616,7 @@ sub cpuhost_do {
$output .= " cpu$instance=" . simplify_number(convert_number($values->{$id}[0]) * 0.01) . "%;;0;100";
}
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -648,7 +660,7 @@ sub nethost_do {
my ($lhost, $pnic, $warn, $crit) = @_;
if (!($perfcounter_speriod > 0)) {
my $status |= $MYERRORS_MASK{'UNKNOWN'};
print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n");
return ;
}
@ -692,7 +704,7 @@ sub nethost_do {
$output = "Traffic In : " . simplify_number($traffic_in / 1024 * 8) . " Mb/s (" . simplify_number($traffic_in / 1024 * 8 * 100 / $pnic_def{$pnic}) . " %), Out : " . simplify_number($traffic_out / 1024 * 8) . " Mb/s (" . simplify_number($traffic_out / 1024 * 8 * 100 / $pnic_def{$pnic}) . " %)";
$output .= "|traffic_in=" . ($traffic_in * 1024 * 8) . "b/s traffic_out=" . (($traffic_out * 1024 * 8)) . "b/s";
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -731,7 +743,7 @@ sub memhost_do {
my ($lhost, $warn, $crit) = @_;
if (!($perfcounter_speriod > 0)) {
my $status |= $MYERRORS_MASK{'UNKNOWN'};
print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n");
return ;
}
@ -765,7 +777,7 @@ sub memhost_do {
$output = "Memory used : " . simplify_number($mem_used / 1024 / 1024) . " Go - size : " . simplify_number($memory_size / 1024 / 1024 / 1024) . " Go - percent : " . simplify_number($mem_used * 100 / ($memory_size / 1024)) . " %";
$output .= "|used=" . ($mem_used * 1024) . "o;" . simplify_number($memory_size * $warn / 100, 0) . ";" . simplify_number($memory_size * $crit / 100, 0) . ";0;" . ($memory_size) . " size=" . $memory_size . "o" . " overhead=" . ($mem_overhead * 1024) . "o";
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -804,7 +816,7 @@ sub swaphost_do {
my ($lhost, $warn, $crit) = @_;
if (!($perfcounter_speriod > 0)) {
my $status |= $MYERRORS_MASK{'UNKNOWN'};
print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n");
return ;
}
@ -836,7 +848,7 @@ sub swaphost_do {
$output = "Swap In : " . simplify_number($swap_in / 1024 * 8) . " Mb/s , Swap Out : " . simplify_number($swap_out / 1024 * 8) . " Mb/s ";
$output .= "|swap_in=" . ($swap_in * 1024 * 8) . "b/s swap_out=" . (($swap_out * 1024 * 8)) . "b/s";
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
@ -870,7 +882,7 @@ sub listhost_do {
$output_append = ', ';
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -918,7 +930,7 @@ sub listdatastore_do {
}
}
print $ERRORS{$MYERRORS{$status}} . "|$output\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output\n");
}
############
@ -963,7 +975,7 @@ sub listnichost_do {
}
}
print $ERRORS{$MYERRORS{$status}} . "|$output_up. $output_down.\n";
print_response($ERRORS{$MYERRORS{$status}} . "|$output_up. $output_down.\n");
}
############
@ -987,28 +999,16 @@ sub verify_child {
# Verify process
foreach (keys %child_proc) {
if (defined($return_child{$child_proc{$_}->{'pid'}}) && $return_child{$child_proc{$_}->{'pid'}}->{'status'} == 1) {
# Check ctime
if (time() - $child_proc{$_}->{'ctime'} > $TIMEOUT) {
my $handle = ${$child_proc{$_}->{'reading'}};
my $output = <$handle>;
print $writer_pipe_one "$_|-1|Timeout Process.\n";
kill('INT', $child_proc{$_}->{'pid'});
$read_select->remove($handle);
close $handle;
if ($output =~ /^-1/) {
$last_time_check = $child_proc{$_}->{'ctime'};
}
chomp $output;
$response_queue->enqueue("$_|$output\n");
delete $return_child{$child_proc{$_}->{'pid'}};
delete $child_proc{$_};
} else {
# Check ctime
if (time() - $child_proc{$_}->{'ctime'} > $TIMEOUT) {
my $handle = ${$child_proc{$_}->{'reading'}};
$response_queue->enqueue("$_|-1|Timeout Process.\n");
kill('INT', $child_proc{$_}->{'pid'});
close $handle;
delete $child_proc{$_};
} else {
$progress++;
}
$progress++;
}
}
# Clean old hash CHILD (security)
@ -1023,37 +1023,40 @@ sub verify_child {
}
sub vsphere_handler {
my $timeout_process;
$read_select = new IO::Select();
$read_select->add($reader_pipe_two);
while (1) {
if ($stop == 1) {
my $timeout_process = 0;
while ($timeout_process <= $TIMEOUT_KILL) {
if (!verify_child()) {
last;
}
$timeout_process++;
sleep(1);
}
if ($timeout_process > $TIMEOUT_KILL) {
writeLogFile("Kill child not gently.\n");
foreach (keys %child_proc) {
kill('INT', $child_proc{$_}->{'pid'});
}
}
my $progress = verify_child();
#####
# Manage ending
#####
if ($stop && $timeout_process > $TIMEOUT_KILL) {
writeLogFile("Kill child not gently.\n");
foreach (keys %child_proc) {
kill('INT', $child_proc{$_}->{'pid'});
}
$progress = 0;
}
if ($stop && !$progress) {
if ($vsphere_connected) {
eval {
Vim::logout();
$session1->logout();
};
}
$response_queue->enqueue("STOPPED\n");
print $writer_pipe_one "STOPPED\n";
exit (0);
}
###
# Manage vpshere connection
###
if (defined($last_time_vsphere) && defined($last_time_check) && $last_time_vsphere < $last_time_check) {
writeLogFile("Deconnect\n");
$vsphere_connected = 0;
eval {
Vim::logout();
$session1->logout();
};
}
if ($vsphere_connected == 0) {
@ -1069,11 +1072,14 @@ sub vsphere_handler {
}
}
###
# Manage session time
###
if (defined($keeper_session_time) && (time() - $keeper_session_time) > ($REFRESH_KEEPER_SESSION * 60)) {
my $stime;
eval {
$stime = Vim::get_service_instance()->CurrentTime();
$stime = $session1->get_service_instance()->CurrentTime();
$keeper_session_time = time();
};
if ($@) {
@ -1087,48 +1093,71 @@ sub vsphere_handler {
}
my $data_element;
while (($data_element = $data_queue->dequeue_nb())) {
chomp $data_element;
if ($data_element =~ /^STOP$/) {
$stop = 1;
next;
}
my ($id) = split(/\|/, $data_element);
if ($vsphere_connected) {
writeLogFile("vpshere handler asking: $data_element\n");
$child_proc{$id} = {'ctime' => time()};
my $reader;
my $writer;
pipe($reader, $writer);
$writer->autoflush(1);
$child_proc{$id}->{'reading'} = \*$reader;
$child_proc{$id}->{'pid'} = fork;
if (!$child_proc{$id}->{'pid'}) {
# Child
close $reader;
open STDOUT, '>&', $writer;
my ($id, $name, @args) = split /\|/, $data_element;
$checks_descr{$name}->{'exec'}($checks_descr{$name}->{'compute'}(@args));
exit(0);
} else {
# Parent
close $writer;
}
} else {
$response_queue->enqueue("$id|-1|Vsphere connection error.");
}
}
verify_child();
my @rh_set;
if ($vsphere_connected == 0) {
sleep(5);
} else {
Time::HiRes::sleep(0.2);
}
if ($stop == 0) {
@rh_set = $read_select->can_read(30);
} else {
sleep(1);
$timeout_process++;
@rh_set = $read_select->can_read(0);
}
foreach my $rh (@rh_set) {
if ($rh == $reader_pipe_two && !$stop) {
$data_element = <$rh>;
chomp $data_element;
if ($data_element =~ /^STOP$/) {
$stop = 1;
$timeout_process = 0;
next;
}
my ($id) = split(/\|/, $data_element);
if ($vsphere_connected) {
writeLogFile("vpshere handler asking: $data_element\n");
$child_proc{$id} = {'ctime' => time()};
my $reader;
my $writer;
pipe($reader, $writer);
$writer->autoflush(1);
$read_select->add($reader);
$child_proc{$id}->{'reading'} = \*$reader;
$child_proc{$id}->{'pid'} = fork;
if (!$child_proc{$id}->{'pid'}) {
# Child
close $reader;
open STDOUT, '>&', $writer;
my ($id, $name, @args) = split /\|/, $data_element;
$global_id = $id;
$checks_descr{$name}->{'exec'}($checks_descr{$name}->{'compute'}(@args));
exit(0);
} else {
# Parent
close $writer;
}
} else {
print $writer_pipe_one "$id|-1|Vsphere connection error.\n";
}
} else {
# Read pipe
my $output = <$rh>;
$read_select->remove($rh);
close $rh;
$output =~ s/^(.*?)\|//;
my $lid = $1;
if ($output =~ /^-1/) {
$last_time_check = $child_proc{$lid}->{'ctime'};
}
chomp $output;
print $writer_pipe_one "$lid|$output\n";
delete $return_child{$child_proc{$lid}->{'pid'}};
delete $child_proc{$lid};
}
}
}
}
@ -1139,97 +1168,118 @@ open my $centesx_fh, '>>', $LOG;
open STDOUT, '>&', $centesx_fh;
open STDERR, '>&', $centesx_fh;
$data_queue = Thread::Queue->new();
$response_queue = Thread::Queue->new();
my $thr = threads->create(\&vsphere_handler);
$thr->detach();
pipe($reader_pipe_one, $writer_pipe_one);
pipe($reader_pipe_two, $writer_pipe_two);
$writer_pipe_one->autoflush(1);
$writer_pipe_two->autoflush(1);
my $server = IO::Socket::INET->new( Proto => "tcp",
LocalPort => $port,
Listen => SOMAXCONN,
Reuse => 1,
Timeout => 0.5);
Reuse => 1);
if (!$server) {
writeLogFile("Can't setup server: $!\n");
exit(1);
}
$child_vpshere_pid = fork();
if (!$child_vpshere_pid) {
close $reader_pipe_one;
close $writer_pipe_two;
vsphere_handler();
exit(0);
}
close $writer_pipe_one;
close $reader_pipe_two;
$read_select = new IO::Select();
$read_select->add($server);
$read_select->add($reader_pipe_one);
writeLogFile("[Server accepting clients]\n");
while (1) {
my $client;
if (!$stop) {
$client = $server->accept();
}
if ($stop == 1) {
my @rh_set = $read_select->can_read(30);
if ($stop == 1) {
writeLogFile("Send STOP command to thread.\n");
$data_queue->enqueue("STOP\n");
print $writer_pipe_two "STOP\n";
$stop = 2;
}
###
# Check
###
my $data_element;
while (($data_element = $response_queue->dequeue_nb())) {
chomp $data_element;
if ($data_element =~ /^STOPPED$/) {
writeLogFile("Thread has stopped\n");
exit(0);
}
# Verify responde queue
#print "Response queue = $data_element\n";
my @results = split(/\|/, $data_element);
my $id = $results[0];
if (!defined($sockets{$id})) {
writeLogFile("Too much time to get response.\n");
next;
}
foreach my $rh (@rh_set) {
if (!$stop && $rh == $server) {
my $client;
writeLogFile("response = $data_element\n");
$data_element =~ s/^.*?\|//;
${$sockets{$id}->{'obj'}}->send($data_element . "\n");
close ${$sockets{$id}->{"obj"}};
delete $sockets{$id};
# Connect to accept
$client = $rh->accept();
$client->autoflush(1);
$counter++;
$sockets{fileno($client)} = {"obj" => \$client, "ctime" => time(), "counter" => $counter};
$read_select->add($client);
next;
} elsif ($rh == $reader_pipe_one) {
# Return to read
my $data_element = <$rh>;
chomp $data_element;
if ($data_element =~ /^STOPPED$/) {
writeLogFile("Thread has stopped\n");
exit(0);
}
# Verify responde queue
#print "Response queue = $data_element\n";
my @results = split(/\|/, $data_element);
my ($id, $counter) = split(/\./, $results[0]);
if (!defined($sockets{$id}) || $counter != $sockets{$id}->{'counter'}) {
writeLogFile("Too much time to get response.\n");
next;
}
writeLogFile("response = $data_element\n");
$data_element =~ s/^.*?\|//;
${$sockets{$id}->{'obj'}}->send($data_element . "\n");
$read_select->remove(${$sockets{$id}->{"obj"}});
close ${$sockets{$id}->{"obj"}};
delete $sockets{$id};
} else {
# Socket
my $line = <$rh>;
if (defined($line) && $line ne "") {
chomp $line;
my ($name, @args) = split /\|/, $line;
if (!defined($checks_descr{$name})) {
$rh->send("3|Unknown method name '$name'\n");
$read_select->remove($rh);
close $rh;
delete $sockets{fileno($rh)};
next;
}
if ($checks_descr{$name}->{'arg'}(@args)) {
$rh->send("3|Params error '$name'\n");
$read_select->remove($rh);
close $rh;
delete $sockets{fileno($rh)};
next;
}
print $writer_pipe_two fileno($rh) . "." . $sockets{fileno($rh)}->{'counter'} . "|$line\n";
} else {
delete $sockets{fileno($rh)};
$rh->send("3|Need arguments\n");
$read_select->remove($rh);
close $rh;
}
}
}
# Verify socket
foreach (keys %sockets) {
if (time() - $sockets{$_}->{'ctime'} > $TIMEOUT) {
writeLogFile("Timeout returns for uuid = '" . $sockets{$_}->{'uuid'} . "'.\n");
writeLogFile("Timeout returns.\n");
${$sockets{$_}->{'obj'}}->send("3|TIMEOUT\n");
$read_select->remove(${$sockets{$_}->{"obj"}});
close ${$sockets{$_}->{"obj"}};
delete $sockets{$_};
}
}
if (!$client) {
next;
}
my $uuid = $counter_request_id;
$counter_request_id++;
$client->autoflush(1);
my $hostinfo = gethostbyaddr($client->peeraddr);
#writeLogFile("[Connect from " . ($hostinfo ? $hostinfo->name : $client->peerhost) . "]\n");
my $line = <$client>;
if (defined($line) && $line ne "") {
chomp $line;
my ($name, @args) = split /\|/, $line;
if (!defined($checks_descr{$name})) {
$client->send("3|Unknown method name '$name'\n");
close $client;
next;
}
if ($checks_descr{$name}->{'arg'}(@args)) {
$client->send("3|Params error '$name'\n");
close $client;
next;
}
$sockets{$uuid} = {"obj" => \$client, "ctime" => time(), "uuid" => $uuid};
$data_queue->enqueue("$uuid|$line\n");
} else {
$client->send("3|Need arguments\n");
close $client;
}
}
exit(0);