From e72a7a42a1cc3e42b6d07a06e0f8fc83c88960f7 Mon Sep 17 00:00:00 2001 From: Quentin Garnier Date: Thu, 13 Sep 2012 11:39:02 +0000 Subject: [PATCH] - Fix race condition with threads::queue git-svn-id: http://svn.merethis.net/centreon-esxd/trunk@9 a5eaa968-4c79-4d68-970d-af6011b5b055 --- connectors/vmware/centreon_esxd | 410 ++++++++++++++++++-------------- 1 file changed, 230 insertions(+), 180 deletions(-) diff --git a/connectors/vmware/centreon_esxd b/connectors/vmware/centreon_esxd index ca98a4537..39efd64da 100644 --- a/connectors/vmware/centreon_esxd +++ b/connectors/vmware/centreon_esxd @@ -9,19 +9,15 @@ use VMware::VIRuntime; use VMware::VILib; use IO::Socket; use Net::hostent; # for OOish version of gethostbyaddr -use threads; -use Thread::Queue; +use IO::Select; use POSIX ":sys_wait_h"; use Data::Dumper; -use Time::HiRes; use vars qw($port $service_url $username $password $TIMEOUT_VSPHERE $TIMEOUT $TIMEOUT_KILL $REFRESH_KEEPER_SESSION $LOG); require '/etc/centreon/centreon_esxd.pm'; our $session_id; -our $data_queue; -our $response_queue; our %sockets = (); our %child_proc; our %return_child; @@ -36,6 +32,15 @@ our $perfcounter_refreshrate = 20; our $perfcounter_speriod = -1; our $stop = 0; our $counter_request_id = 0; +our $child_vpshere_pid; +our $read_select; +our $reader_pipe_one; +our $writer_pipe_one; +our $reader_pipe_two; +our $writer_pipe_two; +our $session1; +our $counter = 0; +our $global_id; our %ERRORS = ( "OK" => 0, "WARNING" => 1, "CRITICAL" => 2, "UNKNOWN" => 3, "PENDING" => 4); our %MYERRORS = (0 => "OK", 1 => "WARNING", 3 => "CRITICAL", 7 => "UNKNOWN"); @@ -66,7 +71,10 @@ sub connect_vsphere { eval { $SIG{ALRM} = sub { die('TIMEOUT'); }; alarm($TIMEOUT_VSPHERE); - Vim::login(service_url=> $service_url, user_name => $username, password => $password); + $session1 = Vim->new(service_url => $service_url); + $session1->login( + user_name => $username, + password => $password); alarm(0); }; if($@) { @@ -75,16 +83,20 @@ sub connect_vsphere { writeLogFile("Login to VirtualCentre server failed: $@"); return 1; } - eval { - $session_id = Vim::get_session_id(); - }; - if($@) { - writeLogFile("Can't get session_id: $@\n"); - return 1; - } +# eval { +# $session_id = Vim::get_session_id(); +# }; +# if($@) { +# writeLogFile("Can't get session_id: $@\n"); +# return 1; +# } return 0; } +sub print_response { + print "$global_id|" . $_[0]; +} + sub output_add($$$$) { my ($output_str, $output_append, $delim, $str) = (shift, shift, shift, shift); $$output_str .= $$output_append . $str; @@ -107,13 +119,13 @@ sub get_views { my $results; eval { - $results = Vim::get_views(mo_ref_array => $_[0], properties => $_[1]); + $results = $session1->get_views(mo_ref_array => $_[0], properties => $_[1]); }; if ($@) { writeLogFile("$@"); my $lerror = $@; $lerror =~ s/\n/ /g; - print "-1|Error: " . $lerror . "\n"; + print_response("-1|Error: " . $lerror . "\n"); return undef; } return $results; @@ -169,7 +181,7 @@ sub generic_performance_values_historic { sub cache_perf_counters { eval { - $perfmanager_view = Vim::get_view(mo_ref => Vim::get_service_content()->perfManager, properties => ['perfCounter', 'historicalInterval']); + $perfmanager_view = $session1->get_view(mo_ref => $session1->get_service_content()->perfManager, properties => ['perfCounter', 'historicalInterval']); foreach (@{$perfmanager_view->perfCounter}) { my $label = $_->groupInfo->key . "." . $_->nameInfo->key . "." . $_->rollupType->val; $perfcounter_cache{$label} = {'key' => $_->key, 'unitkey' => $_->unitInfo->key}; @@ -196,29 +208,29 @@ sub get_entities_host { my $entity_views; eval { - $entity_views = Vim::find_entity_views(view_type => $view_type, properties => $properties, filter => $filters); + $entity_views = $session1->find_entity_views(view_type => $view_type, properties => $properties, filter => $filters); }; if ($@ =~ /decryption failed or bad record mac/) { writeLogFile("$@"); eval { - $entity_views = Vim::find_entity_views(view_type => $view_type, properties => $properties, filter => $filters); + $entity_views = $session1->find_entity_views(view_type => $view_type, properties => $properties, filter => $filters); }; if ($@) { my $lerror = $@; $lerror =~ s/\n/ /g; - print "-1|Error: " . $lerror . "\n"; + print_response("-1|Error: " . Data::Dumper::Dumper($lerror) . "\n"); return undef; } } elsif ($@) { writeLogFile("$@"); my $lerror = $@; $lerror =~ s/\n/ /g; - print "-1|Error: " . $lerror . "\n"; + print_response("-1|Error: " . $lerror . "\n"); return undef; } if (!@$entity_views) { my $status |= $MYERRORS_MASK{'UNKNOWN'}; - print $ERRORS{$MYERRORS{$status}} . "|Host does not exist.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|Host does not exist.\n"); return undef; } #eval { @@ -331,7 +343,7 @@ sub healthhost_do { $output = "All $OKCount health checks are green"; } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -419,7 +431,7 @@ sub datastores_do { $output = "Datastore '$ds' not found or summary not accessible."; $status |= $MYERRORS_MASK{'UNKNOWN'}; } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -461,7 +473,7 @@ sub maintenancehost_do { } } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -521,7 +533,7 @@ sub statushost_do { } } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -560,7 +572,7 @@ sub cpuhost_do { my ($lhost, $warn, $crit) = @_; if (!($perfcounter_speriod > 0)) { my $status |= $MYERRORS_MASK{'UNKNOWN'}; - print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"); return ; } @@ -604,7 +616,7 @@ sub cpuhost_do { $output .= " cpu$instance=" . simplify_number(convert_number($values->{$id}[0]) * 0.01) . "%;;0;100"; } } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -648,7 +660,7 @@ sub nethost_do { my ($lhost, $pnic, $warn, $crit) = @_; if (!($perfcounter_speriod > 0)) { my $status |= $MYERRORS_MASK{'UNKNOWN'}; - print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"); return ; } @@ -692,7 +704,7 @@ sub nethost_do { $output = "Traffic In : " . simplify_number($traffic_in / 1024 * 8) . " Mb/s (" . simplify_number($traffic_in / 1024 * 8 * 100 / $pnic_def{$pnic}) . " %), Out : " . simplify_number($traffic_out / 1024 * 8) . " Mb/s (" . simplify_number($traffic_out / 1024 * 8 * 100 / $pnic_def{$pnic}) . " %)"; $output .= "|traffic_in=" . ($traffic_in * 1024 * 8) . "b/s traffic_out=" . (($traffic_out * 1024 * 8)) . "b/s"; - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -731,7 +743,7 @@ sub memhost_do { my ($lhost, $warn, $crit) = @_; if (!($perfcounter_speriod > 0)) { my $status |= $MYERRORS_MASK{'UNKNOWN'}; - print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"); return ; } @@ -765,7 +777,7 @@ sub memhost_do { $output = "Memory used : " . simplify_number($mem_used / 1024 / 1024) . " Go - size : " . simplify_number($memory_size / 1024 / 1024 / 1024) . " Go - percent : " . simplify_number($mem_used * 100 / ($memory_size / 1024)) . " %"; $output .= "|used=" . ($mem_used * 1024) . "o;" . simplify_number($memory_size * $warn / 100, 0) . ";" . simplify_number($memory_size * $crit / 100, 0) . ";0;" . ($memory_size) . " size=" . $memory_size . "o" . " overhead=" . ($mem_overhead * 1024) . "o"; - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -804,7 +816,7 @@ sub swaphost_do { my ($lhost, $warn, $crit) = @_; if (!($perfcounter_speriod > 0)) { my $status |= $MYERRORS_MASK{'UNKNOWN'}; - print $ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|Can't retrieve perf counters.\n"); return ; } @@ -836,7 +848,7 @@ sub swaphost_do { $output = "Swap In : " . simplify_number($swap_in / 1024 * 8) . " Mb/s , Swap Out : " . simplify_number($swap_out / 1024 * 8) . " Mb/s "; $output .= "|swap_in=" . ($swap_in * 1024 * 8) . "b/s swap_out=" . (($swap_out * 1024 * 8)) . "b/s"; - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } @@ -870,7 +882,7 @@ sub listhost_do { $output_append = ', '; } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -918,7 +930,7 @@ sub listdatastore_do { } } - print $ERRORS{$MYERRORS{$status}} . "|$output\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output\n"); } ############ @@ -963,7 +975,7 @@ sub listnichost_do { } } - print $ERRORS{$MYERRORS{$status}} . "|$output_up. $output_down.\n"; + print_response($ERRORS{$MYERRORS{$status}} . "|$output_up. $output_down.\n"); } ############ @@ -987,28 +999,16 @@ sub verify_child { # Verify process foreach (keys %child_proc) { - if (defined($return_child{$child_proc{$_}->{'pid'}}) && $return_child{$child_proc{$_}->{'pid'}}->{'status'} == 1) { + # Check ctime + if (time() - $child_proc{$_}->{'ctime'} > $TIMEOUT) { my $handle = ${$child_proc{$_}->{'reading'}}; - my $output = <$handle>; + print $writer_pipe_one "$_|-1|Timeout Process.\n"; + kill('INT', $child_proc{$_}->{'pid'}); + $read_select->remove($handle); close $handle; - if ($output =~ /^-1/) { - $last_time_check = $child_proc{$_}->{'ctime'}; - } - chomp $output; - $response_queue->enqueue("$_|$output\n"); - delete $return_child{$child_proc{$_}->{'pid'}}; delete $child_proc{$_}; } else { - # Check ctime - if (time() - $child_proc{$_}->{'ctime'} > $TIMEOUT) { - my $handle = ${$child_proc{$_}->{'reading'}}; - $response_queue->enqueue("$_|-1|Timeout Process.\n"); - kill('INT', $child_proc{$_}->{'pid'}); - close $handle; - delete $child_proc{$_}; - } else { - $progress++; - } + $progress++; } } # Clean old hash CHILD (security) @@ -1023,37 +1023,40 @@ sub verify_child { } sub vsphere_handler { + my $timeout_process; + $read_select = new IO::Select(); + $read_select->add($reader_pipe_two); while (1) { - if ($stop == 1) { - my $timeout_process = 0; - while ($timeout_process <= $TIMEOUT_KILL) { - if (!verify_child()) { - last; - } - $timeout_process++; - sleep(1); - } - if ($timeout_process > $TIMEOUT_KILL) { - writeLogFile("Kill child not gently.\n"); - foreach (keys %child_proc) { - kill('INT', $child_proc{$_}->{'pid'}); - } - } + my $progress = verify_child(); + ##### + # Manage ending + ##### + if ($stop && $timeout_process > $TIMEOUT_KILL) { + writeLogFile("Kill child not gently.\n"); + foreach (keys %child_proc) { + kill('INT', $child_proc{$_}->{'pid'}); + } + $progress = 0; + } + if ($stop && !$progress) { if ($vsphere_connected) { eval { - Vim::logout(); + $session1->logout(); }; } - $response_queue->enqueue("STOPPED\n"); + print $writer_pipe_one "STOPPED\n"; exit (0); } - + ### + # Manage vpshere connection + ### if (defined($last_time_vsphere) && defined($last_time_check) && $last_time_vsphere < $last_time_check) { + writeLogFile("Deconnect\n"); $vsphere_connected = 0; eval { - Vim::logout(); + $session1->logout(); }; } if ($vsphere_connected == 0) { @@ -1069,11 +1072,14 @@ sub vsphere_handler { } } + ### + # Manage session time + ### if (defined($keeper_session_time) && (time() - $keeper_session_time) > ($REFRESH_KEEPER_SESSION * 60)) { my $stime; eval { - $stime = Vim::get_service_instance()->CurrentTime(); + $stime = $session1->get_service_instance()->CurrentTime(); $keeper_session_time = time(); }; if ($@) { @@ -1087,48 +1093,71 @@ sub vsphere_handler { } my $data_element; - while (($data_element = $data_queue->dequeue_nb())) { - chomp $data_element; - if ($data_element =~ /^STOP$/) { - $stop = 1; - next; - } - - my ($id) = split(/\|/, $data_element); - if ($vsphere_connected) { - writeLogFile("vpshere handler asking: $data_element\n"); - $child_proc{$id} = {'ctime' => time()}; - - my $reader; - my $writer; - pipe($reader, $writer); - $writer->autoflush(1); - - $child_proc{$id}->{'reading'} = \*$reader; - $child_proc{$id}->{'pid'} = fork; - if (!$child_proc{$id}->{'pid'}) { - # Child - close $reader; - open STDOUT, '>&', $writer; - my ($id, $name, @args) = split /\|/, $data_element; - $checks_descr{$name}->{'exec'}($checks_descr{$name}->{'compute'}(@args)); - exit(0); - } else { - # Parent - close $writer; - } - } else { - $response_queue->enqueue("$id|-1|Vsphere connection error."); - } - } - - verify_child(); - + my @rh_set; if ($vsphere_connected == 0) { sleep(5); - } else { - Time::HiRes::sleep(0.2); } + if ($stop == 0) { + @rh_set = $read_select->can_read(30); + } else { + sleep(1); + $timeout_process++; + @rh_set = $read_select->can_read(0); + } + foreach my $rh (@rh_set) { + if ($rh == $reader_pipe_two && !$stop) { + $data_element = <$rh>; + chomp $data_element; + if ($data_element =~ /^STOP$/) { + $stop = 1; + $timeout_process = 0; + next; + } + + my ($id) = split(/\|/, $data_element); + if ($vsphere_connected) { + writeLogFile("vpshere handler asking: $data_element\n"); + $child_proc{$id} = {'ctime' => time()}; + + my $reader; + my $writer; + pipe($reader, $writer); + $writer->autoflush(1); + + $read_select->add($reader); + $child_proc{$id}->{'reading'} = \*$reader; + $child_proc{$id}->{'pid'} = fork; + if (!$child_proc{$id}->{'pid'}) { + # Child + close $reader; + open STDOUT, '>&', $writer; + my ($id, $name, @args) = split /\|/, $data_element; + $global_id = $id; + $checks_descr{$name}->{'exec'}($checks_descr{$name}->{'compute'}(@args)); + exit(0); + } else { + # Parent + close $writer; + } + } else { + print $writer_pipe_one "$id|-1|Vsphere connection error.\n"; + } + } else { + # Read pipe + my $output = <$rh>; + $read_select->remove($rh); + close $rh; + $output =~ s/^(.*?)\|//; + my $lid = $1; + if ($output =~ /^-1/) { + $last_time_check = $child_proc{$lid}->{'ctime'}; + } + chomp $output; + print $writer_pipe_one "$lid|$output\n"; + delete $return_child{$child_proc{$lid}->{'pid'}}; + delete $child_proc{$lid}; + } + } } } @@ -1139,97 +1168,118 @@ open my $centesx_fh, '>>', $LOG; open STDOUT, '>&', $centesx_fh; open STDERR, '>&', $centesx_fh; - -$data_queue = Thread::Queue->new(); -$response_queue = Thread::Queue->new(); -my $thr = threads->create(\&vsphere_handler); -$thr->detach(); +pipe($reader_pipe_one, $writer_pipe_one); +pipe($reader_pipe_two, $writer_pipe_two); +$writer_pipe_one->autoflush(1); +$writer_pipe_two->autoflush(1); my $server = IO::Socket::INET->new( Proto => "tcp", LocalPort => $port, Listen => SOMAXCONN, - Reuse => 1, - Timeout => 0.5); + Reuse => 1); if (!$server) { writeLogFile("Can't setup server: $!\n"); exit(1); } + +$child_vpshere_pid = fork(); +if (!$child_vpshere_pid) { + close $reader_pipe_one; + close $writer_pipe_two; + vsphere_handler(); + exit(0); +} +close $writer_pipe_one; +close $reader_pipe_two; + +$read_select = new IO::Select(); +$read_select->add($server); +$read_select->add($reader_pipe_one); writeLogFile("[Server accepting clients]\n"); while (1) { - my $client; - if (!$stop) { - $client = $server->accept(); - } - if ($stop == 1) { + my @rh_set = $read_select->can_read(30); + if ($stop == 1) { writeLogFile("Send STOP command to thread.\n"); - $data_queue->enqueue("STOP\n"); + print $writer_pipe_two "STOP\n"; $stop = 2; } - ### - # Check - ### - my $data_element; - - while (($data_element = $response_queue->dequeue_nb())) { - chomp $data_element; - if ($data_element =~ /^STOPPED$/) { - writeLogFile("Thread has stopped\n"); - exit(0); - } - # Verify responde queue - #print "Response queue = $data_element\n"; - my @results = split(/\|/, $data_element); - my $id = $results[0]; - if (!defined($sockets{$id})) { - writeLogFile("Too much time to get response.\n"); - next; - } + foreach my $rh (@rh_set) { + if (!$stop && $rh == $server) { + my $client; - writeLogFile("response = $data_element\n"); - $data_element =~ s/^.*?\|//; - ${$sockets{$id}->{'obj'}}->send($data_element . "\n"); - close ${$sockets{$id}->{"obj"}}; - delete $sockets{$id}; + # Connect to accept + $client = $rh->accept(); + $client->autoflush(1); + $counter++; + $sockets{fileno($client)} = {"obj" => \$client, "ctime" => time(), "counter" => $counter}; + $read_select->add($client); + next; + } elsif ($rh == $reader_pipe_one) { + # Return to read + my $data_element = <$rh>; + chomp $data_element; + if ($data_element =~ /^STOPPED$/) { + writeLogFile("Thread has stopped\n"); + exit(0); + } + # Verify responde queue + #print "Response queue = $data_element\n"; + my @results = split(/\|/, $data_element); + my ($id, $counter) = split(/\./, $results[0]); + if (!defined($sockets{$id}) || $counter != $sockets{$id}->{'counter'}) { + writeLogFile("Too much time to get response.\n"); + next; + } + + writeLogFile("response = $data_element\n"); + $data_element =~ s/^.*?\|//; + ${$sockets{$id}->{'obj'}}->send($data_element . "\n"); + $read_select->remove(${$sockets{$id}->{"obj"}}); + close ${$sockets{$id}->{"obj"}}; + delete $sockets{$id}; + } else { + # Socket + my $line = <$rh>; + if (defined($line) && $line ne "") { + chomp $line; + my ($name, @args) = split /\|/, $line; + if (!defined($checks_descr{$name})) { + $rh->send("3|Unknown method name '$name'\n"); + $read_select->remove($rh); + close $rh; + delete $sockets{fileno($rh)}; + next; + } + if ($checks_descr{$name}->{'arg'}(@args)) { + $rh->send("3|Params error '$name'\n"); + $read_select->remove($rh); + close $rh; + delete $sockets{fileno($rh)}; + next; + } + + print $writer_pipe_two fileno($rh) . "." . $sockets{fileno($rh)}->{'counter'} . "|$line\n"; + } else { + delete $sockets{fileno($rh)}; + $rh->send("3|Need arguments\n"); + $read_select->remove($rh); + close $rh; + } + } } + + # Verify socket foreach (keys %sockets) { if (time() - $sockets{$_}->{'ctime'} > $TIMEOUT) { - writeLogFile("Timeout returns for uuid = '" . $sockets{$_}->{'uuid'} . "'.\n"); + writeLogFile("Timeout returns.\n"); ${$sockets{$_}->{'obj'}}->send("3|TIMEOUT\n"); + $read_select->remove(${$sockets{$_}->{"obj"}}); close ${$sockets{$_}->{"obj"}}; delete $sockets{$_}; } } - if (!$client) { - next; - } - my $uuid = $counter_request_id; - $counter_request_id++; - $client->autoflush(1); - my $hostinfo = gethostbyaddr($client->peeraddr); - #writeLogFile("[Connect from " . ($hostinfo ? $hostinfo->name : $client->peerhost) . "]\n"); - my $line = <$client>; - if (defined($line) && $line ne "") { - chomp $line; - my ($name, @args) = split /\|/, $line; - if (!defined($checks_descr{$name})) { - $client->send("3|Unknown method name '$name'\n"); - close $client; - next; - } - if ($checks_descr{$name}->{'arg'}(@args)) { - $client->send("3|Params error '$name'\n"); - close $client; - next; - } - - $sockets{$uuid} = {"obj" => \$client, "ctime" => time(), "uuid" => $uuid}; - $data_queue->enqueue("$uuid|$line\n"); - } else { - $client->send("3|Need arguments\n"); - close $client; - } } exit(0);