Add system waiting for messages
Dynamic connections
This commit is contained in:
Quentin Garnier 2014-10-28 13:43:53 +01:00
parent a03d67da00
commit a9ee878e3c
2 changed files with 116 additions and 33 deletions

View File

@ -200,6 +200,7 @@ sub run {
zmq_setsockopt($backend, ZMQ_IDENTITY, "server-" . $connector->{whoaim});
zmq_setsockopt($backend, ZMQ_LINGER, 0); # we discard
zmq_connect($backend, 'ipc://routing.ipc');
centreon::esxd::common::response(token => 'READY', endpoint => $backend, stdout => '');
# Initialize poll set
my @poll = (

View File

@ -9,9 +9,9 @@ use VMware::VILib;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(:all);
use File::Basename;
use Digest::MD5 qw(md5_hex);
use POSIX ":sys_wait_h";
use JSON;
use Data::Dumper;
use centreon::script;
use centreon::esxd::common;
use centreon::esxd::connector;
@ -28,32 +28,33 @@ use vars qw(%centreonesxd_config);
my $VERSION = "1.6.0";
my %handlers = (TERM => {}, HUP => {}, CHLD => {});
my @load_modules = ('centreon::esxd::cmdcountvmhost',
'centreon::esxd::cmdcpuhost',
'centreon::esxd::cmdcpuvm',
'centreon::esxd::cmddatastoreio',
'centreon::esxd::cmddatastoreiops',
'centreon::esxd::cmddatastoreshost',
'centreon::esxd::cmddatastoresnapshots',
'centreon::esxd::cmddatastoresvm',
'centreon::esxd::cmddatastoreusage',
'centreon::esxd::cmdgetmap',
'centreon::esxd::cmdhealthhost',
'centreon::esxd::cmdlimitvm',
'centreon::esxd::cmdlistdatastores',
'centreon::esxd::cmdlistnichost',
'centreon::esxd::cmdmemhost',
'centreon::esxd::cmdmaintenancehost',
'centreon::esxd::cmdmemvm',
'centreon::esxd::cmdnethost',
'centreon::esxd::cmdsnapshotvm',
'centreon::esxd::cmdstatushost',
'centreon::esxd::cmdswaphost',
'centreon::esxd::cmdswapvm',
'centreon::esxd::cmdthinprovisioningvm',
'centreon::esxd::cmdtoolsvm',
'centreon::esxd::cmduptimehost'
);
my @load_modules = (
'centreon::esxd::cmdcountvmhost',
'centreon::esxd::cmdcpuhost',
'centreon::esxd::cmdcpuvm',
'centreon::esxd::cmddatastoreio',
'centreon::esxd::cmddatastoreiops',
'centreon::esxd::cmddatastoreshost',
'centreon::esxd::cmddatastoresnapshots',
'centreon::esxd::cmddatastoresvm',
'centreon::esxd::cmddatastoreusage',
'centreon::esxd::cmdgetmap',
'centreon::esxd::cmdhealthhost',
'centreon::esxd::cmdlimitvm',
'centreon::esxd::cmdlistdatastores',
'centreon::esxd::cmdlistnichost',
'centreon::esxd::cmdmemhost',
'centreon::esxd::cmdmaintenancehost',
'centreon::esxd::cmdmemvm',
'centreon::esxd::cmdnethost',
'centreon::esxd::cmdsnapshotvm',
'centreon::esxd::cmdstatushost',
'centreon::esxd::cmdswaphost',
'centreon::esxd::cmdswapvm',
'centreon::esxd::cmdthinprovisioningvm',
'centreon::esxd::cmdtoolsvm',
'centreon::esxd::cmduptimehost'
);
sub new {
my $class = shift;
@ -75,6 +76,7 @@ sub new {
timeout_vsphere => 60,
timeout => 60,
timeout_kill => 30,
dynamic_timeout_kill => 86400,
refresh_keeper_session => 15,
port => 5700,
datastore_state_error => 'UNKNOWN',
@ -238,7 +240,12 @@ sub verify_child_vsphere {
if ($self->{stop} == 0) {
$self->{logger}->writeLogError("Sub-process for '" . $self->{childs_vpshere_pid}->{$_} . "'???!! we relaunch it!!!");
$self->create_vsphere_child(vsphere_name => $self->{childs_vpshere_pid}->{$_});
if ($self->{centreonesxd_config}->{vsphere_server}->{$self->{childs_vpshere_pid}->{$_}}->{dynamic} == 0) {
$self->create_vsphere_child(vsphere_name => $self->{childs_vpshere_pid}->{$_}, dynamic => 0);
} else {
$self->{logger}->writeLogError("Sub-process for '" . $self->{childs_vpshere_pid}->{$_} . "' is dead. But we don't relaunch it (dynamic sub-process)");
delete $self->{centreonesxd_config}->{vsphere_server}->{$self->{childs_vpshere_pid}->{$_}};
}
} else {
$self->{logger}->writeLogInfo("Sub-process for '" . $self->{childs_vpshere_pid}->{$_} . "' dead ???!!");
$self->{centreonesxd_config}->{vsphere_server}->{$self->{childs_vpshere_pid}->{$_}}->{running} = 0;
@ -253,11 +260,70 @@ sub verify_child_vsphere {
if ($self->{centreonesxd_config}->{vsphere_server}->{$_}->{running} == 1) {
$count++;
}
if ($self->{centreonesxd_config}->{vsphere_server}->{$_}->{dynamic} == 1 &&
time() - $self->{centreonesxd_config}->{dynamic_timeout_kill} > $self->{centreonesxd_config}->{vsphere_server}->{$_}->{last_request}) {
$self->{logger}->writeLogError("Send TERM signal for process '" . $_ . "': too many times without requests. We clean it.");
kill('TERM', $self->{centreonesxd_config}->{vsphere_server}->{$_}->{pid});
}
}
return $count;
}
sub waiting_ready {
my ($self, %options) = @_;
return 1 if ($self->{centreonesxd_config}->{vsphere_server}->{$options{container}}->{ready} == 1);
my $time = time();
# We wait 10 seconds
while ($self->{centreonesxd_config}->{vsphere_server}->{$options{container}}->{ready} == 0 &&
time() - $time < 10) {
zmq_poll($self->{poll}, 5000);
}
if ($self->{centreonesxd_config}->{vsphere_server}->{$options{container}}->{ready} == 0) {
$options{manager}->{output}->output_add(severity => 'UNKNOWN',
short_msg => "connector still not ready.");
centreon::esxd::common::response(token => 'RESPSERVER', endpoint => $frontend, identity => $options{identity});
return 0;
}
return 1;
}
sub request_dynamic {
my ($self, %options) = @_;
if (!defined($options{result}->{vsphere_username}) || $options{result}->{vsphere_username} eq '' ||
!defined($options{result}->{vsphere_password}) || $options{result}->{vsphere_password} eq '') {
$options{manager}->{output}->output_add(severity => 'UNKNOWN',
short_msg => "Please set vsphere username or password");
centreon::esxd::common::response(token => 'RESPSERVER', endpoint => $frontend, identity => $options{identity});
return ;
}
my $container = md5_hex($options{result}->{vsphere_address} . $options{result}->{vsphere_username} . $options{result}->{vsphere_password});
# Need to create fork
if (!defined($self->{centreonesxd_config}->{vsphere_server}->{$container})) {
$self->{centreonesxd_config}->{vsphere_server}->{$container} = { url => 'https://' . $options{result}->{vsphere_address} . '/sdk',
username => $options{result}->{vsphere_username},
password => $options{result}->{vsphere_password},
last_request => time() };
$self->{logger}->writeLogError(sprintf("Dynamic creation: identity = %s [address: %s] [username: %s] [password: %s]",
$container, $options{result}->{vsphere_address}, $options{result}->{vsphere_username}, $options{result}->{vsphere_password}));
$centreonesxd->create_vsphere_child(vsphere_name => $container, dynamic => 1);
}
return if ($self->waiting_ready(container => $container, manager => $options{manager},
identity => $options{identity}) == 0);
$self->{centreonesxd_config}->{vsphere_server}->{$container}->{last_request} = time();
my $flag = ZMQ_NOBLOCK | ZMQ_SNDMORE;
zmq_sendmsg($frontend, "server-" . $container, $flag);
zmq_sendmsg($frontend, 'REQCLIENT ' . $options{data}, ZMQ_NOBLOCK);
}
sub request {
my ($self, %options) = @_;
@ -289,6 +355,13 @@ sub request {
centreon::esxd::common::response(token => 'RESPSERVER', endpoint => $frontend, identity => $options{identity});
return ;
}
# Mode dynamic
if (defined($result->{vsphere_address}) && $result->{vsphere_address} ne '') {
$self->request_dynamic(result => $result, %options);
return ;
}
if (!defined($self->{centreonesxd_config}->{vsphere_server}->{$result->{container}})) {
$options{manager}->{output}->output_add(severity => 'UNKNOWN',
short_msg => "Unknown container name '$result->{container}'");
@ -296,6 +369,9 @@ sub request {
return ;
}
return if ($self->waiting_ready(container => $result->{container}, manager => $options{manager},
identity => $options{identity}) == 0);
$self->{counter_stats}->{$result->{container}}++;
my $flag = ZMQ_NOBLOCK | ZMQ_SNDMORE;
zmq_sendmsg($frontend, "server-" . $result->{container}, $flag);
@ -335,12 +411,15 @@ sub router_event {
if ($centreonesxd->{stop} != 0) {
# We quit so we say we're leaving ;)
$manager->{output}->output_add(severity => 'UNKNOWN',
short_msg => 'Daemon is restarting...');
short_msg => 'Daemon is restarting/stopping...');
centreon::esxd::common::response(token => 'RESPSERVER', endpoint => $frontend, identity => $identity);
} elsif ($data =~ /^REQCLIENT\s+(.*)$/msi) {
$centreonesxd->request(identity => $identity, data => $1, manager => $manager);
} elsif ($data =~ /^RESPSERVER2\s+(.*)$/msi) {
$centreonesxd->repserver(data => $1, manager => $manager);
} elsif ($data =~ /^READY/msi) {
$identity =~ /server-(.*)/;
$centreonesxd->{centreonesxd_config}->{vsphere_server}->{$1}->{ready} = 1;
}
my $more = zmq_getsockopt($frontend, ZMQ_RCVMORE);
@ -353,6 +432,7 @@ sub create_vsphere_child {
$self->{whoaim} = $options{vsphere_name};
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{running} = 0;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{ready} = 0;
$self->{logger}->writeLogInfo("Create vsphere sub-process for '" . $options{vsphere_name} . "'");
my $child_vpshere_pid = fork();
@ -367,6 +447,8 @@ sub create_vsphere_child {
}
$self->{childs_vpshere_pid}->{$child_vpshere_pid} = $self->{whoaim};
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{running} = 1;
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{dynamic} = $options{dynamic};
$self->{centreonesxd_config}->{vsphere_server}->{$self->{whoaim}}->{pid} = $child_vpshere_pid;
}
sub run {
@ -392,19 +474,19 @@ sub run {
foreach (keys %{$centreonesxd->{centreonesxd_config}->{vsphere_server}}) {
$centreonesxd->{counter_stats}->{$_} = 0;
$centreonesxd->create_vsphere_child(vsphere_name => $_);
$centreonesxd->create_vsphere_child(vsphere_name => $_, dynamic => 0);
}
$centreonesxd->{logger}->writeLogInfo("[Server accepting clients]");
# Initialize poll set
my @poll = (
$centreonesxd->{poll} = [
{
socket => $frontend,
events => ZMQ_POLLIN,
callback => \&router_event,
},
);
];
# Switch messages between sockets
while (1) {
@ -419,7 +501,7 @@ sub run {
}
}
zmq_poll(\@poll, 5000);
zmq_poll($centreonesxd->{poll}, 5000);
}
}