From c47cda1b7c792af65b3311e678db5cc2ff5b1246 Mon Sep 17 00:00:00 2001 From: Quentin Garnier Date: Fri, 17 Oct 2014 17:33:33 +0200 Subject: [PATCH] + begin to work --- connectors/vmware/centreon_esxd | 2 + connectors/vmware/centreonesxd.pm | 281 ++++++++---------------------- 2 files changed, 78 insertions(+), 205 deletions(-) diff --git a/connectors/vmware/centreon_esxd b/connectors/vmware/centreon_esxd index c63f3f893..aac3e5d2e 100644 --- a/connectors/vmware/centreon_esxd +++ b/connectors/vmware/centreon_esxd @@ -2,6 +2,8 @@ use warnings; use centreon::script::centreonesxd; +use FindBin; +use lib "$FindBin::Bin"; centreon::script::centreonesxd->new()->run(); diff --git a/connectors/vmware/centreonesxd.pm b/connectors/vmware/centreonesxd.pm index d103bab2a..7caa310cf 100644 --- a/connectors/vmware/centreonesxd.pm +++ b/connectors/vmware/centreonesxd.pm @@ -5,9 +5,9 @@ package centreon::script::centreonesxd; use strict; use VMware::VIRuntime; use VMware::VILib; -use IO::Socket; +use ZMQ::LibZMQ3; +use ZMQ::Constants qw(:all); use File::Basename; -use IO::Select; use POSIX ":sys_wait_h"; use Data::Dumper; use centreon::script; @@ -252,11 +252,6 @@ sub verify_child_vsphere { if (defined($self->{centreonesxd_config}->{vsphere_server}->{$_})) { $self->{logger}->writeLogError("Sub-process for '" . $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} . "' dead ???!! We relaunch it"); - close $self->{centreonesxd_config}->{vsphere_server}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} }->{writer_two}; - close $self->{centreonesxd_config}->{vsphere_server}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} }->{reader_one}; - delete $self->{filenos}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{fd} }; - $self->{read_select}->remove($self->{centreonesxd_config}->{vsphere_server}->{$_}->{fd}); - $self->create_vsphere_child(vsphere_name => $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name}); delete $self->{centreonesxd_config}->{vsphere_server}->{$_}; } @@ -266,17 +261,14 @@ sub verify_child_vsphere { sub verify_child { my $self = shift; my $progress = 0; - my $handle_writer_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one}}; # Verify process foreach (keys %{$self->{child_proc}}) { # Check ctime if (time() - $self->{child_proc}->{$_}->{ctime} > $self->{centreonesxd_config}->{timeout}) { - my $handle = ${$self->{child_proc}->{$_}->{reading}}; - print $handle_writer_pipe "$_|-1|Timeout Process.\n"; + print "===timeout papa===\n"; + #print $handle_writer_pipe "$_|-1|Timeout Process.\n"; kill('INT', $self->{child_proc}->{$_}->{pid}); - $self->{read_select}->remove($handle); - close $handle; delete $self->{child_proc}->{$_}; } else { $progress++; @@ -297,11 +289,13 @@ sub vsphere_handler { my $self = shift; my $timeout_process; - my $handle_reader_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two}}; - my $fileno_reader = fileno($handle_reader_pipe); - my $handle_writer_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one}}; - $self->{read_select} = new IO::Select(); - $self->{read_select}->add($handle_reader_pipe); + my $context = zmq_init(); + + my $backend = zmq_socket($context, ZMQ_DEALER); + #zmq_setsockopt($backend, ZMQ_IDENTITY, "server-" . $self->{whoaim}); + #zmq_connect($backend, 'ipc://routing.ipc'); + #zmq_sendmsg($backend, 'ready'); + while (1) { my $progress = $self->verify_child(); @@ -320,8 +314,7 @@ sub vsphere_handler { eval { $self->{session1}->logout(); }; - } - print $handle_writer_pipe "STOPPED|$self->{whoaim}\n"; + } exit (0); } @@ -379,104 +372,46 @@ sub vsphere_handler { if ($self->{vsphere_connected} == 0) { sleep(5); } - if ($self->{stop} == 0) { - @rh_set = $self->{read_select}->can_read(30); - } else { + if ($self->{stop} != 0) { sleep(1); - $timeout_process++; - @rh_set = $self->{read_select}->can_read(0); + next; } - foreach my $rh (@rh_set) { - if (fileno($rh) == $fileno_reader && !$self->{stop}) { - $data_element = <$rh>; - chomp $data_element; - if ($data_element =~ /^STOP$/) { - $self->{stop} = 1; - $timeout_process = 0; - next; - } + + my $message = zmq_recvmsg($backend, ZMQ_DONTWAIT); + if (defined($message)) { + if ($self->{vsphere_connected}) { + $self->{logger}->writeLogInfo("vpshere '" . $self->{whoaim} . "' handler asking: $message"); - my ($id) = split(/\Q$self->{separatorin}\E/, $data_element); - if ($self->{vsphere_connected}) { - $self->{logger}->writeLogInfo("vpshere '" . $self->{whoaim} . "' handler asking: $data_element"); - $self->{child_proc}->{$id} = {ctime => time()}; - - my $reader; - my $writer; - pipe($reader, $writer); - $writer->autoflush(1); - - $self->{read_select}->add($reader); - $self->{child_proc}->{$id}->{reading} = \*$reader; - $self->{child_proc}->{$id}->{pid} = fork; - if (!$self->{child_proc}->{$id}->{pid}) { - # Child - close $reader; - open STDOUT, '>&', $writer; - # Can't print on stdout - $self->{logger}->{log_mode} = 1 if ($self->{logger}->{log_mode} == 0); - my ($id, $name, @args) = split /\Q$self->{separatorin}\E/, $data_element; - $self->{global_id} = $id; - $self->{modules_registry}->{$name}->initArgs(@args); - $self->{modules_registry}->{$name}->run(); - exit(0); - } else { - # Parent - close $writer; - } - } else { - print $handle_writer_pipe "$id|-1|Vsphere connection error.\n"; + $self->{child_proc}->{pid} = fork; + if (!$self->{child_proc}->{pid}) { + # Child + $self->{logger}->{log_mode} = 1 if ($self->{logger}->{log_mode} == 0); + my ($id, $name, @args) = split /\Q$self->{separatorin}\E/, $data_element; + $self->{global_id} = $id; + $self->{modules_registry}->{$name}->initArgs(@args); + $self->{modules_registry}->{$name}->run(); + exit(0); } } else { - # Read pipe - my $output = <$rh>; - $self->{read_select}->remove($rh); - close $rh; - $output =~ s/^(.*?)\|//; - my $lid = $1; - if ($output =~ /^-1/) { - $self->{last_time_check} = $self->{child_proc}->{$lid}->{ctime}; - } - chomp $output; - print $handle_writer_pipe "$lid|$output\n"; - delete $self->{return_child}->{$self->{child_proc}->{$lid}->{pid}}; - delete $self->{child_proc}->{$lid}; + #print $handle_writer_pipe "$id|-1|Vsphere connection error.\n"; } - } + } } } sub create_vsphere_child { my ($self, %options) = @_; - $self->{logger}->writeLogInfo("Create vsphere sub-process for '" . $options{vsphere_name} . "'"); - my ($reader_pipe_one, $writer_pipe_one); - my ($reader_pipe_two, $writer_pipe_two); + $self->{logger}->writeLogInfo("Create vsphere sub-process for '" . $options{vsphere_name} . "'"); $self->{whoaim} = $options{vsphere_name}; - pipe($reader_pipe_one, $writer_pipe_one); - pipe($reader_pipe_two, $writer_pipe_two); - $writer_pipe_one->autoflush(1); - $writer_pipe_two->autoflush(1); - - $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_one} = \*$reader_pipe_one; - $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one} = \*$writer_pipe_one; - $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two} = \*$reader_pipe_two; - $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_two} = \*$writer_pipe_two; $self->{child_vpshere_pid} = fork(); if (!$self->{child_vpshere_pid}) { - close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_one}; - close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_two}; $self->vsphere_handler(); exit(0); } $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{running} = 1; - $self->{centreonesxd_config}->{vsphere_server}->{$self->{child_vpshere_pid}} = { name => $self->{whoaim}, fd => fileno(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}})}; - close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one}; - close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two}; - - $self->{filenos}->{fileno(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}})} = 1; - $self->{read_select}->add(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}}); + $self->{centreonesxd_config}->{vsphere_server}->{$self->{child_vpshere_pid}} = { name => $self->{whoaim} }; } sub run { @@ -488,127 +423,63 @@ sub run { $self->{logger}->writeLogDebug("centreonesxd launched...."); $self->{logger}->writeLogDebug("PID: $$"); - my $server = IO::Socket::INET->new( Proto => "tcp", - LocalPort => $self->{centreonesxd_config}->{port}, - Listen => SOMAXCONN, - Reuse => 1); - if (!$server) { + my $context = zmq_init(); + my $frontend = zmq_socket($context, ZMQ_ROUTER); + + zmq_bind($frontend, 'tcp://*:5700'); + zmq_bind($frontend, 'ipc://routing.ipc'); + + if (!defined($frontend)) { $self->{logger}->writeLogError("Can't setup server: $!"); exit(1); } - - ## - # Create childs - ## - $self->{read_select} = new IO::Select(); - $self->{read_select}->add($server); foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) { $self->create_vsphere_child(vsphere_name => $_); } - my $socket_fileno = fileno($server); $self->{logger}->writeLogInfo("[Server accepting clients]"); - while (1) { - my @rh_set = $self->{read_select}->can_read(15); - if ($self->{stop} == 1) { - # No childs - if (scalar(keys %{$self->{centreonesxd_config}->{vsphere_server}}) == 0) { - $self->{logger}->writeLogInfo("Quit main process"); - exit(0); - } - foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) { - $self->{logger}->writeLogInfo("Send STOP command to '$_' child."); - my $writer_handle = $self->{centreonesxd_config}->{vsphere_server}->{$_}->{writer_two}; - print $writer_handle "STOP\n"; - } - $self->{stop} = 2; - } - foreach my $rh (@rh_set) { - my $current_fileno = fileno($rh); - if (!$self->{stop} && $current_fileno == $socket_fileno) { - my $client; - # Connect to accept - $client = $rh->accept(); - $client->autoflush(1); - $self->{counter}++; - $self->{sockets}->{fileno($client)} = {obj => \$client, ctime => time(), counter => $self->{counter}}; - $self->{read_select}->add($client); - next; - } elsif (defined($self->{filenos}->{$current_fileno})) { - # Return to read - my $data_element = <$rh>; - chomp $data_element; - if ($data_element =~ /^STOPPED/) { - # We have to wait all childs - my ($name, $which_one) = split(/\|/, $data_element); - $self->{logger}->writeLogInfo("Thread vsphere '$which_one' has stopped"); - $self->{centreonesxd_config}->{vsphere_server}->{$which_one}->{running} = 0; - my $to_stop_or_not = 1; - foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) { - $to_stop_or_not = 0 if ($self->{centreonesxd_config}->{vsphere_server}->{$_}->{running} == 1); - } - if ($to_stop_or_not == 1) { - # We quit - $self->{logger}->writeLogInfo("Quit main process"); - exit(0); - } - next; - } - my @results = split(/\|/, $data_element); - my ($id, $counter) = split(/\./, $results[0]); - if (!defined($self->{sockets}->{$id}) || $counter != $self->{sockets}->{$id}->{counter}) { - $self->{logger}->writeLogInfo("Too much time to get response."); - next; - } - - $self->{logger}->writeLogInfo("response = $data_element"); - $data_element =~ s/^.*?\|//; - centreon::esxd::common::response_client2($self, $id, $data_element . "\n"); - } else { - # Socket - my $line = <$rh>; - if (defined($line) && $line ne "") { - chomp $line; - my ($name, $vsphere_name, @args) = split /\Q$self->{separatorin}\E/, $line; + + # Initialize poll set + my @poll = ( + { + socket => $frontend, + events => ZMQ_POLLIN, + callback => sub { + while (1) { + # Process all parts of the message + my $message = zmq_recvmsg($frontend); + print "=== " . Data::Dumper::Dumper(zmq_msg_data($message)) . " ===\n"; + $message = zmq_recvmsg($frontend); + print "=== " . Data::Dumper::Dumper(zmq_msg_data($message)) . " ===\n"; + + my $more = zmq_getsockopt($frontend, ZMQ_RCVMORE); - if ($name eq 'stats') { - centreon::esxd::common::stats_info($self, $rh, $current_fileno, \@args); - next; - } - if (!defined($self->{modules_registry}->{$name})) { - centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Unknown method name '$name'\n"); - next; - } - if ($self->{modules_registry}->{$name}->checkArgs(@args)) { - centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Params error '$name'\n"); - next; - } - - $vsphere_name = 'default' if (!defined($vsphere_name) || $vsphere_name eq ''); - if (!defined($self->{centreonesxd_config}->{vsphere_server}->{$vsphere_name})) { - centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Vsphere name unknown\n"); - next; - } - - my $tmp_handle = ${$self->{centreonesxd_config}->{vsphere_server}->{$vsphere_name}->{writer_two}}; - print $tmp_handle $current_fileno . "." . $self->{sockets}->{$current_fileno}->{counter} . $self->{separatorin} . $name . $self->{separatorin} . join($self->{separatorin}, @args) . "\n"; - } else { - centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Need arguments\n"); + #zmq_sendmsg($backend, $message, $more ? ZMQ_SNDMORE : 0); + last unless $more; + return 1; } } - } + }, + ); - # Check if there some dead sub-process. - $self->verify_child_vsphere(); - - # Verify socket - foreach (keys %{$self->{sockets}}) { - if (time() - $self->{sockets}->{$_}->{ctime} > $self->{centreonesxd_config}->{timeout}) { - $self->{logger}->writeLogInfo("Timeout returns."); - centreon::esxd::common::response_client2($self, $_, "3|TIMEOUT\n"); - } - } + # Switch messages between sockets + while (1) { + #if ($self->{stop} == 1) { + # No childs + # if (scalar(keys %{$self->{centreonesxd_config}->{vsphere_server}}) == 0) { + # $self->{logger}->writeLogInfo("Quit main process"); + # exit(0); + # } + # foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) { + # $self->{logger}->writeLogInfo("Send STOP command to '$_' child."); + # my $send_msg = zmq_msg_init_data($_ . " STOP"); + # zmq_msg_send($send_msg, $publisher, 0); + # } + # $self->{stop} = 2; + #} + + zmq_poll(\@poll, 1000); } }