+ begin to work

This commit is contained in:
Quentin Garnier 2014-10-17 17:33:33 +02:00
parent 9618d5e771
commit c47cda1b7c
2 changed files with 78 additions and 205 deletions

View File

@ -2,6 +2,8 @@
use warnings;
use centreon::script::centreonesxd;
use FindBin;
use lib "$FindBin::Bin";
centreon::script::centreonesxd->new()->run();

View File

@ -5,9 +5,9 @@ package centreon::script::centreonesxd;
use strict;
use VMware::VIRuntime;
use VMware::VILib;
use IO::Socket;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(:all);
use File::Basename;
use IO::Select;
use POSIX ":sys_wait_h";
use Data::Dumper;
use centreon::script;
@ -252,11 +252,6 @@ sub verify_child_vsphere {
if (defined($self->{centreonesxd_config}->{vsphere_server}->{$_})) {
$self->{logger}->writeLogError("Sub-process for '" . $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} . "' dead ???!! We relaunch it");
close $self->{centreonesxd_config}->{vsphere_server}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} }->{writer_two};
close $self->{centreonesxd_config}->{vsphere_server}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name} }->{reader_one};
delete $self->{filenos}->{ $self->{centreonesxd_config}->{vsphere_server}->{$_}->{fd} };
$self->{read_select}->remove($self->{centreonesxd_config}->{vsphere_server}->{$_}->{fd});
$self->create_vsphere_child(vsphere_name => $self->{centreonesxd_config}->{vsphere_server}->{$_}->{name});
delete $self->{centreonesxd_config}->{vsphere_server}->{$_};
}
@ -266,17 +261,14 @@ sub verify_child_vsphere {
sub verify_child {
my $self = shift;
my $progress = 0;
my $handle_writer_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one}};
# Verify process
foreach (keys %{$self->{child_proc}}) {
# Check ctime
if (time() - $self->{child_proc}->{$_}->{ctime} > $self->{centreonesxd_config}->{timeout}) {
my $handle = ${$self->{child_proc}->{$_}->{reading}};
print $handle_writer_pipe "$_|-1|Timeout Process.\n";
print "===timeout papa===\n";
#print $handle_writer_pipe "$_|-1|Timeout Process.\n";
kill('INT', $self->{child_proc}->{$_}->{pid});
$self->{read_select}->remove($handle);
close $handle;
delete $self->{child_proc}->{$_};
} else {
$progress++;
@ -297,11 +289,13 @@ sub vsphere_handler {
my $self = shift;
my $timeout_process;
my $handle_reader_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two}};
my $fileno_reader = fileno($handle_reader_pipe);
my $handle_writer_pipe = ${$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one}};
$self->{read_select} = new IO::Select();
$self->{read_select}->add($handle_reader_pipe);
my $context = zmq_init();
my $backend = zmq_socket($context, ZMQ_DEALER);
#zmq_setsockopt($backend, ZMQ_IDENTITY, "server-" . $self->{whoaim});
#zmq_connect($backend, 'ipc://routing.ipc');
#zmq_sendmsg($backend, 'ready');
while (1) {
my $progress = $self->verify_child();
@ -321,7 +315,6 @@ sub vsphere_handler {
$self->{session1}->logout();
};
}
print $handle_writer_pipe "STOPPED|$self->{whoaim}\n";
exit (0);
}
@ -379,68 +372,28 @@ sub vsphere_handler {
if ($self->{vsphere_connected} == 0) {
sleep(5);
}
if ($self->{stop} == 0) {
@rh_set = $self->{read_select}->can_read(30);
} else {
if ($self->{stop} != 0) {
sleep(1);
$timeout_process++;
@rh_set = $self->{read_select}->can_read(0);
next;
}
foreach my $rh (@rh_set) {
if (fileno($rh) == $fileno_reader && !$self->{stop}) {
$data_element = <$rh>;
chomp $data_element;
if ($data_element =~ /^STOP$/) {
$self->{stop} = 1;
$timeout_process = 0;
next;
}
my ($id) = split(/\Q$self->{separatorin}\E/, $data_element);
if ($self->{vsphere_connected}) {
$self->{logger}->writeLogInfo("vpshere '" . $self->{whoaim} . "' handler asking: $data_element");
$self->{child_proc}->{$id} = {ctime => time()};
my $message = zmq_recvmsg($backend, ZMQ_DONTWAIT);
if (defined($message)) {
if ($self->{vsphere_connected}) {
$self->{logger}->writeLogInfo("vpshere '" . $self->{whoaim} . "' handler asking: $message");
my $reader;
my $writer;
pipe($reader, $writer);
$writer->autoflush(1);
$self->{read_select}->add($reader);
$self->{child_proc}->{$id}->{reading} = \*$reader;
$self->{child_proc}->{$id}->{pid} = fork;
if (!$self->{child_proc}->{$id}->{pid}) {
# Child
close $reader;
open STDOUT, '>&', $writer;
# Can't print on stdout
$self->{logger}->{log_mode} = 1 if ($self->{logger}->{log_mode} == 0);
my ($id, $name, @args) = split /\Q$self->{separatorin}\E/, $data_element;
$self->{global_id} = $id;
$self->{modules_registry}->{$name}->initArgs(@args);
$self->{modules_registry}->{$name}->run();
exit(0);
} else {
# Parent
close $writer;
}
} else {
print $handle_writer_pipe "$id|-1|Vsphere connection error.\n";
$self->{child_proc}->{pid} = fork;
if (!$self->{child_proc}->{pid}) {
# Child
$self->{logger}->{log_mode} = 1 if ($self->{logger}->{log_mode} == 0);
my ($id, $name, @args) = split /\Q$self->{separatorin}\E/, $data_element;
$self->{global_id} = $id;
$self->{modules_registry}->{$name}->initArgs(@args);
$self->{modules_registry}->{$name}->run();
exit(0);
}
} else {
# Read pipe
my $output = <$rh>;
$self->{read_select}->remove($rh);
close $rh;
$output =~ s/^(.*?)\|//;
my $lid = $1;
if ($output =~ /^-1/) {
$self->{last_time_check} = $self->{child_proc}->{$lid}->{ctime};
}
chomp $output;
print $handle_writer_pipe "$lid|$output\n";
delete $self->{return_child}->{$self->{child_proc}->{$lid}->{pid}};
delete $self->{child_proc}->{$lid};
#print $handle_writer_pipe "$id|-1|Vsphere connection error.\n";
}
}
}
@ -450,33 +403,15 @@ sub create_vsphere_child {
my ($self, %options) = @_;
$self->{logger}->writeLogInfo("Create vsphere sub-process for '" . $options{vsphere_name} . "'");
my ($reader_pipe_one, $writer_pipe_one);
my ($reader_pipe_two, $writer_pipe_two);
$self->{whoaim} = $options{vsphere_name};
pipe($reader_pipe_one, $writer_pipe_one);
pipe($reader_pipe_two, $writer_pipe_two);
$writer_pipe_one->autoflush(1);
$writer_pipe_two->autoflush(1);
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_one} = \*$reader_pipe_one;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one} = \*$writer_pipe_one;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two} = \*$reader_pipe_two;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_two} = \*$writer_pipe_two;
$self->{child_vpshere_pid} = fork();
if (!$self->{child_vpshere_pid}) {
close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_one};
close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_two};
$self->vsphere_handler();
exit(0);
}
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{running} = 1;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{child_vpshere_pid}} = { name => $self->{whoaim}, fd => fileno(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}})};
close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{writer_one};
close $self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{reader_two};
$self->{filenos}->{fileno(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}})} = 1;
$self->{read_select}->add(${$self->{centreonesxd_config}->{vsphere_server}->{$options{vsphere_name}}->{reader_one}});
$self->{centreonesxd_config}->{vsphere_server}->{$self->{child_vpshere_pid}} = { name => $self->{whoaim} };
}
sub run {
@ -488,127 +423,63 @@ sub run {
$self->{logger}->writeLogDebug("centreonesxd launched....");
$self->{logger}->writeLogDebug("PID: $$");
my $server = IO::Socket::INET->new( Proto => "tcp",
LocalPort => $self->{centreonesxd_config}->{port},
Listen => SOMAXCONN,
Reuse => 1);
if (!$server) {
my $context = zmq_init();
my $frontend = zmq_socket($context, ZMQ_ROUTER);
zmq_bind($frontend, 'tcp://*:5700');
zmq_bind($frontend, 'ipc://routing.ipc');
if (!defined($frontend)) {
$self->{logger}->writeLogError("Can't setup server: $!");
exit(1);
}
##
# Create childs
##
$self->{read_select} = new IO::Select();
$self->{read_select}->add($server);
foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) {
$self->create_vsphere_child(vsphere_name => $_);
}
my $socket_fileno = fileno($server);
$self->{logger}->writeLogInfo("[Server accepting clients]");
# Initialize poll set
my @poll = (
{
socket => $frontend,
events => ZMQ_POLLIN,
callback => sub {
while (1) {
# Process all parts of the message
my $message = zmq_recvmsg($frontend);
print "=== " . Data::Dumper::Dumper(zmq_msg_data($message)) . " ===\n";
$message = zmq_recvmsg($frontend);
print "=== " . Data::Dumper::Dumper(zmq_msg_data($message)) . " ===\n";
my $more = zmq_getsockopt($frontend, ZMQ_RCVMORE);
#zmq_sendmsg($backend, $message, $more ? ZMQ_SNDMORE : 0);
last unless $more;
return 1;
}
}
},
);
# Switch messages between sockets
while (1) {
my @rh_set = $self->{read_select}->can_read(15);
if ($self->{stop} == 1) {
#if ($self->{stop} == 1) {
# No childs
if (scalar(keys %{$self->{centreonesxd_config}->{vsphere_server}}) == 0) {
$self->{logger}->writeLogInfo("Quit main process");
exit(0);
}
foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) {
$self->{logger}->writeLogInfo("Send STOP command to '$_' child.");
my $writer_handle = $self->{centreonesxd_config}->{vsphere_server}->{$_}->{writer_two};
print $writer_handle "STOP\n";
}
$self->{stop} = 2;
}
foreach my $rh (@rh_set) {
my $current_fileno = fileno($rh);
if (!$self->{stop} && $current_fileno == $socket_fileno) {
my $client;
# Connect to accept
$client = $rh->accept();
$client->autoflush(1);
$self->{counter}++;
$self->{sockets}->{fileno($client)} = {obj => \$client, ctime => time(), counter => $self->{counter}};
$self->{read_select}->add($client);
next;
} elsif (defined($self->{filenos}->{$current_fileno})) {
# Return to read
my $data_element = <$rh>;
chomp $data_element;
if ($data_element =~ /^STOPPED/) {
# We have to wait all childs
my ($name, $which_one) = split(/\|/, $data_element);
$self->{logger}->writeLogInfo("Thread vsphere '$which_one' has stopped");
$self->{centreonesxd_config}->{vsphere_server}->{$which_one}->{running} = 0;
my $to_stop_or_not = 1;
foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) {
$to_stop_or_not = 0 if ($self->{centreonesxd_config}->{vsphere_server}->{$_}->{running} == 1);
}
if ($to_stop_or_not == 1) {
# We quit
$self->{logger}->writeLogInfo("Quit main process");
exit(0);
}
next;
}
my @results = split(/\|/, $data_element);
my ($id, $counter) = split(/\./, $results[0]);
if (!defined($self->{sockets}->{$id}) || $counter != $self->{sockets}->{$id}->{counter}) {
$self->{logger}->writeLogInfo("Too much time to get response.");
next;
}
# if (scalar(keys %{$self->{centreonesxd_config}->{vsphere_server}}) == 0) {
# $self->{logger}->writeLogInfo("Quit main process");
# exit(0);
# }
# foreach (keys %{$self->{centreonesxd_config}->{vsphere_server}}) {
# $self->{logger}->writeLogInfo("Send STOP command to '$_' child.");
# my $send_msg = zmq_msg_init_data($_ . " STOP");
# zmq_msg_send($send_msg, $publisher, 0);
# }
# $self->{stop} = 2;
#}
$self->{logger}->writeLogInfo("response = $data_element");
$data_element =~ s/^.*?\|//;
centreon::esxd::common::response_client2($self, $id, $data_element . "\n");
} else {
# Socket
my $line = <$rh>;
if (defined($line) && $line ne "") {
chomp $line;
my ($name, $vsphere_name, @args) = split /\Q$self->{separatorin}\E/, $line;
if ($name eq 'stats') {
centreon::esxd::common::stats_info($self, $rh, $current_fileno, \@args);
next;
}
if (!defined($self->{modules_registry}->{$name})) {
centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Unknown method name '$name'\n");
next;
}
if ($self->{modules_registry}->{$name}->checkArgs(@args)) {
centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Params error '$name'\n");
next;
}
$vsphere_name = 'default' if (!defined($vsphere_name) || $vsphere_name eq '');
if (!defined($self->{centreonesxd_config}->{vsphere_server}->{$vsphere_name})) {
centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Vsphere name unknown\n");
next;
}
my $tmp_handle = ${$self->{centreonesxd_config}->{vsphere_server}->{$vsphere_name}->{writer_two}};
print $tmp_handle $current_fileno . "." . $self->{sockets}->{$current_fileno}->{counter} . $self->{separatorin} . $name . $self->{separatorin} . join($self->{separatorin}, @args) . "\n";
} else {
centreon::esxd::common::response_client1($self, $rh, $current_fileno, "3|Need arguments\n");
}
}
}
# Check if there some dead sub-process.
$self->verify_child_vsphere();
# Verify socket
foreach (keys %{$self->{sockets}}) {
if (time() - $self->{sockets}->{$_}->{ctime} > $self->{centreonesxd_config}->{timeout}) {
$self->{logger}->writeLogInfo("Timeout returns.");
centreon::esxd::common::response_client2($self, $_, "3|TIMEOUT\n");
}
}
zmq_poll(\@poll, 1000);
}
}