Gracefully stop pandorafms threads
This commit is contained in:
parent
d160737fb2
commit
b3ac981681
|
@ -75,6 +75,36 @@ sub pandora_shutdown () {
|
|||
|
||||
# Stop the netflow daemon
|
||||
pandora_stop_netflow_daemon ();
|
||||
|
||||
# Stop server threads.
|
||||
stop_server_threads();
|
||||
|
||||
# Wait threads.
|
||||
my $max_wait = 10;
|
||||
my $waiting = 1;
|
||||
my $start_waiting = time();
|
||||
while ($waiting eq 1) {
|
||||
$waiting = 0;
|
||||
foreach my $thr (threads->list()) {
|
||||
#my $tid = shift @{$self->{'_threads'}};
|
||||
#my $thr = threads->object($tid);
|
||||
|
||||
if (defined($thr)) {
|
||||
if ($thr->is_joinable()) {
|
||||
$thr->join();
|
||||
} else {
|
||||
$thr->kill('KILL');
|
||||
if (time() - $start_waiting < $max_wait) {
|
||||
$waiting = 1;
|
||||
} else {
|
||||
# Some discovery external scripts tasks.
|
||||
$thr->detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep (1);
|
||||
}
|
||||
|
||||
print_message (\%Config, ' [*] Shutting down ' . $Config{'servername'} . "(received signal)...\n", 1);
|
||||
db_disconnect ($DBH);
|
||||
|
@ -109,7 +139,7 @@ sub pandora_startup () {
|
|||
start_server_thread(\&pandora_process_policy_queue, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'policy_manager'} == 1);
|
||||
|
||||
# Start the event replication thread. Do not start with start_server_thread, this thread may exit on its own.
|
||||
threads->create(\&pandora_process_event_replication, \%Config) if($Config{'__enterprise_enabled'} == 1 && $Config{'event_replication'} == 1);
|
||||
start_server_thread(\&pandora_process_event_replication, [\%Config]) if($Config{'__enterprise_enabled'} == 1 && $Config{'event_replication'} == 1);
|
||||
|
||||
pandora_audit (\%Config, $Config{'rb_product_name'} . ' Server Daemon starting', 'SYSTEM', 'System', $DBH);
|
||||
|
||||
|
|
|
@ -88,15 +88,35 @@ sub run ($$$$$) {
|
|||
|
||||
# Launch consumer threads
|
||||
for (1..$self->getNumThreads ()) {
|
||||
my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_consumer, $self,
|
||||
$task_queue, $pending_tasks, $sem, $task_sem);
|
||||
my $thr = threads->create ({'exit' => 'thread_only'},
|
||||
sub {
|
||||
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
|
||||
local $SIG{'KILL'} = sub {
|
||||
$RUN = 0;
|
||||
$task_sem->up();
|
||||
$sem->up();
|
||||
exit 0;
|
||||
};
|
||||
PandoraFMS::ProducerConsumerServer::data_consumer->(@_);
|
||||
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
|
||||
);
|
||||
return unless defined ($thr);
|
||||
$self->addThread ($thr->tid ());
|
||||
}
|
||||
|
||||
# Launch producer thread
|
||||
my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_producer, $self,
|
||||
$task_queue, $pending_tasks, $sem, $task_sem);
|
||||
my $thr = threads->create ({'exit' => 'thread_only'},
|
||||
sub {
|
||||
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
|
||||
local $SIG{'KILL'} = sub {
|
||||
$RUN = 0;
|
||||
$task_sem->up();
|
||||
$sem->up();
|
||||
exit 0;
|
||||
};
|
||||
PandoraFMS::ProducerConsumerServer::data_producer->(@_);
|
||||
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
|
||||
);
|
||||
return unless defined ($thr);
|
||||
$self->addThread ($thr->tid ());
|
||||
}
|
||||
|
@ -124,6 +144,7 @@ sub data_producer ($$$$$) {
|
|||
foreach my $task (@tasks) {
|
||||
$sem->down;
|
||||
|
||||
last if ($RUN == 0);
|
||||
if (defined $pending_tasks->{$task}) {
|
||||
$sem->up;
|
||||
next;
|
||||
|
@ -137,6 +158,7 @@ sub data_producer ($$$$$) {
|
|||
$sem->up;
|
||||
}
|
||||
|
||||
last if ($RUN == 0);
|
||||
# Update queue size for statistics
|
||||
$self->setQueueSize (scalar @{$task_queue});
|
||||
|
||||
|
@ -151,6 +173,7 @@ sub data_producer ($$$$$) {
|
|||
|
||||
$task_sem->up($self->getNumThreads ());
|
||||
db_disconnect ($dbh);
|
||||
exit 0;
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
|
@ -168,12 +191,12 @@ sub data_consumer ($$$$$) {
|
|||
$self->setDBH ($dbh);
|
||||
|
||||
while ($RUN == 1) {
|
||||
|
||||
# Wait for data
|
||||
$self->logThread('[CONSUMER] Waiting for data.');
|
||||
$task_sem->down;
|
||||
|
||||
$sem->down;
|
||||
last if ($RUN == 0);
|
||||
my $task = shift (@{$task_queue});
|
||||
$sem->up;
|
||||
|
||||
|
@ -198,6 +221,7 @@ sub data_consumer ($$$$$) {
|
|||
}
|
||||
|
||||
db_disconnect ($dbh);
|
||||
exit 0;
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
|
|
|
@ -68,7 +68,12 @@ sub run ($$) {
|
|||
$self->setServerID ();
|
||||
|
||||
for (1..$self->{'_num_threads'}) {
|
||||
my $thr = threads->create (\&{$func}, $self);
|
||||
my $thr = threads->create ({'exit' => 'thread_only'},
|
||||
sub {
|
||||
local $SIG{'KILL'} = sub { exit 0; };
|
||||
$func->(@_);
|
||||
}, $self
|
||||
);
|
||||
return unless defined ($thr);
|
||||
push (@{$self->{'_threads'}}, $thr->tid ());
|
||||
}
|
||||
|
@ -301,12 +306,12 @@ sub stop ($) {
|
|||
0, $self->{'_server_type'}, 0, 0);
|
||||
};
|
||||
|
||||
# Detach server threads
|
||||
# Sigkill all server threads
|
||||
foreach my $tid (@{$self->{'_threads'}}) {
|
||||
my $thr = threads->object($tid);
|
||||
next unless defined ($thr);
|
||||
|
||||
$thr->detach();
|
||||
$thr->kill('KILL');
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -164,6 +164,7 @@ our @EXPORT = qw(
|
|||
ui_get_full_url
|
||||
p_encode_json
|
||||
p_decode_json
|
||||
get_server_name
|
||||
);
|
||||
|
||||
# ID of the different servers
|
||||
|
@ -2146,7 +2147,12 @@ sub start_server_thread {
|
|||
# Signal the threads to run.
|
||||
$THRRUN = 1;
|
||||
|
||||
my $thr = threads->create($fn, @{$args});
|
||||
my $thr = threads->create({'exit' => 'thread_only'}, sub {
|
||||
local $SIG{'KILL'} = sub {
|
||||
exit 0;
|
||||
};
|
||||
$fn->(@_)
|
||||
}, @{$args});
|
||||
push(@ServerThreads, $thr);
|
||||
}
|
||||
|
||||
|
@ -2173,7 +2179,7 @@ sub stop_server_threads {
|
|||
$THRRUN = 0;
|
||||
|
||||
foreach my $thr (@ServerThreads) {
|
||||
$thr->join();
|
||||
$thr->kill('KILL');
|
||||
}
|
||||
|
||||
@ServerThreads = ();
|
||||
|
@ -2481,6 +2487,40 @@ sub p_decode_json {
|
|||
return $decoded_data;
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# String name for server type.
|
||||
################################################################################
|
||||
sub get_server_name {
|
||||
my ($server_type) = @_;
|
||||
|
||||
if (!is_numeric($server_type)) {
|
||||
return 'UNKNOWN';
|
||||
}
|
||||
|
||||
return "DATASERVER" if ($server_type eq DATASERVER);
|
||||
return "NETWORKSERVER" if ($server_type eq NETWORKSERVER);
|
||||
return "SNMPCONSOLE" if ($server_type eq SNMPCONSOLE);
|
||||
return "DISCOVERYSERVER" if ($server_type eq DISCOVERYSERVER);
|
||||
return "PLUGINSERVER" if ($server_type eq PLUGINSERVER);
|
||||
return "PREDICTIONSERVER" if ($server_type eq PREDICTIONSERVER);
|
||||
return "WMISERVER" if ($server_type eq WMISERVER);
|
||||
return "EXPORTSERVER" if ($server_type eq EXPORTSERVER);
|
||||
return "INVENTORYSERVER" if ($server_type eq INVENTORYSERVER);
|
||||
return "WEBSERVER" if ($server_type eq WEBSERVER);
|
||||
return "EVENTSERVER" if ($server_type eq EVENTSERVER);
|
||||
return "ICMPSERVER" if ($server_type eq ICMPSERVER);
|
||||
return "SNMPSERVER" if ($server_type eq SNMPSERVER);
|
||||
return "SATELLITESERVER" if ($server_type eq SATELLITESERVER);
|
||||
return "TRANSACTIONALSERVER" if ($server_type eq TRANSACTIONALSERVER);
|
||||
return "MFSERVER" if ($server_type eq MFSERVER);
|
||||
return "SYNCSERVER" if ($server_type eq SYNCSERVER);
|
||||
return "WUXSERVER" if ($server_type eq WUXSERVER);
|
||||
return "SYSLOGSERVER" if ($server_type eq SYSLOGSERVER);
|
||||
return "PROVISIONINGSERVER" if ($server_type eq PROVISIONINGSERVER);
|
||||
return "MIGRATIONSERVER" if ($server_type eq MIGRATIONSERVER);
|
||||
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
1;
|
||||
__END__
|
||||
|
|
Loading…
Reference in New Issue