Merge branch 'ent-7510-muchas-conexiones-sobre-la-bbdd-too-many-connections-mysql' into 'develop'

Avoid die while decoding erroneus json data

See merge request artica/pandorafms!4116
This commit is contained in:
Daniel Rodriguez 2021-06-14 11:39:41 +00:00
commit 42f5727b18
4 changed files with 127 additions and 18 deletions

View File

@ -77,6 +77,36 @@ sub pandora_shutdown () {
# Stop the netflow daemon # Stop the netflow daemon
pandora_stop_netflow_daemon (); pandora_stop_netflow_daemon ();
# Stop server threads.
stop_server_threads();
# Wait threads.
my $max_wait = 10;
my $waiting = 1;
my $start_waiting = time();
while ($waiting eq 1) {
$waiting = 0;
foreach my $thr (threads->list()) {
#my $tid = shift @{$self->{'_threads'}};
#my $thr = threads->object($tid);
if (defined($thr)) {
if ($thr->is_joinable()) {
$thr->join();
} else {
$thr->kill('KILL');
if (time() - $start_waiting < $max_wait) {
$waiting = 1;
} else {
# Some discovery external scripts tasks.
$thr->detach();
}
}
}
}
sleep (1);
}
print_message (\%Config, ' [*] Shutting down ' . $Config{'servername'} . "(received signal)...\n", 1); print_message (\%Config, ' [*] Shutting down ' . $Config{'servername'} . "(received signal)...\n", 1);
db_disconnect ($DBH); db_disconnect ($DBH);
if ($Config{'PID'} ne "") { if ($Config{'PID'} ne "") {
@ -110,7 +140,7 @@ sub pandora_startup () {
start_server_thread(\&pandora_process_policy_queue, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'policy_manager'} == 1); start_server_thread(\&pandora_process_policy_queue, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'policy_manager'} == 1);
# Start the event replication thread. Do not start with start_server_thread, this thread may exit on its own. # Start the event replication thread. Do not start with start_server_thread, this thread may exit on its own.
threads->create(\&pandora_process_event_replication, \%Config) if($Config{'__enterprise_enabled'} == 1 && $Config{'event_replication'} == 1); start_server_thread(\&pandora_process_event_replication, [\%Config]) if($Config{'__enterprise_enabled'} == 1 && $Config{'event_replication'} == 1);
pandora_audit (\%Config, $Config{'rb_product_name'} . ' Server Daemon starting', 'SYSTEM', 'System', $DBH); pandora_audit (\%Config, $Config{'rb_product_name'} . ' Server Daemon starting', 'SYSTEM', 'System', $DBH);
@ -774,7 +804,9 @@ sub main() {
db_do ($DBH, db_do ($DBH,
"UPDATE tserver SET status = -1 "UPDATE tserver SET status = -1
WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(keepalive) > 2*server_keepalive" WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(keepalive) > 2*server_keepalive
AND status != 0 AND server_type != ?",
PandoraFMS::Tools::SATELLITESERVER
); );
# Set the master server # Set the master server

View File

@ -88,15 +88,35 @@ sub run ($$$$$) {
# Launch consumer threads # Launch consumer threads
for (1..$self->getNumThreads ()) { for (1..$self->getNumThreads ()) {
my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_consumer, $self, my $thr = threads->create ({'exit' => 'thread_only'},
$task_queue, $pending_tasks, $sem, $task_sem); sub {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
local $SIG{'KILL'} = sub {
$RUN = 0;
$task_sem->up();
$sem->up();
exit 0;
};
PandoraFMS::ProducerConsumerServer::data_consumer->(@_);
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
return unless defined ($thr); return unless defined ($thr);
$self->addThread ($thr->tid ()); $self->addThread ($thr->tid ());
} }
# Launch producer thread # Launch producer thread
my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_producer, $self, my $thr = threads->create ({'exit' => 'thread_only'},
$task_queue, $pending_tasks, $sem, $task_sem); sub {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
local $SIG{'KILL'} = sub {
$RUN = 0;
$task_sem->up();
$sem->up();
exit 0;
};
PandoraFMS::ProducerConsumerServer::data_producer->(@_);
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
return unless defined ($thr); return unless defined ($thr);
$self->addThread ($thr->tid ()); $self->addThread ($thr->tid ());
} }
@ -124,6 +144,7 @@ sub data_producer ($$$$$) {
foreach my $task (@tasks) { foreach my $task (@tasks) {
$sem->down; $sem->down;
last if ($RUN == 0);
if (defined $pending_tasks->{$task}) { if (defined $pending_tasks->{$task}) {
$sem->up; $sem->up;
next; next;
@ -137,6 +158,7 @@ sub data_producer ($$$$$) {
$sem->up; $sem->up;
} }
last if ($RUN == 0);
# Update queue size for statistics # Update queue size for statistics
$self->setQueueSize (scalar @{$task_queue}); $self->setQueueSize (scalar @{$task_queue});
@ -151,6 +173,7 @@ sub data_producer ($$$$$) {
$task_sem->up($self->getNumThreads ()); $task_sem->up($self->getNumThreads ());
db_disconnect ($dbh); db_disconnect ($dbh);
exit 0;
} }
############################################################################### ###############################################################################
@ -168,12 +191,12 @@ sub data_consumer ($$$$$) {
$self->setDBH ($dbh); $self->setDBH ($dbh);
while ($RUN == 1) { while ($RUN == 1) {
# Wait for data # Wait for data
$self->logThread('[CONSUMER] Waiting for data.'); $self->logThread('[CONSUMER] Waiting for data.');
$task_sem->down; $task_sem->down;
$sem->down; $sem->down;
last if ($RUN == 0);
my $task = shift (@{$task_queue}); my $task = shift (@{$task_queue});
$sem->up; $sem->up;
@ -198,6 +221,7 @@ sub data_consumer ($$$$$) {
} }
db_disconnect ($dbh); db_disconnect ($dbh);
exit 0;
} }
############################################################################### ###############################################################################

View File

@ -68,7 +68,12 @@ sub run ($$) {
$self->setServerID (); $self->setServerID ();
for (1..$self->{'_num_threads'}) { for (1..$self->{'_num_threads'}) {
my $thr = threads->create (\&{$func}, $self); my $thr = threads->create ({'exit' => 'thread_only'},
sub {
local $SIG{'KILL'} = sub { exit 0; };
$func->(@_);
}, $self
);
return unless defined ($thr); return unless defined ($thr);
push (@{$self->{'_threads'}}, $thr->tid ()); push (@{$self->{'_threads'}}, $thr->tid ());
} }
@ -301,12 +306,12 @@ sub stop ($) {
0, $self->{'_server_type'}, 0, 0); 0, $self->{'_server_type'}, 0, 0);
}; };
# Detach server threads # Sigkill all server threads
foreach my $tid (@{$self->{'_threads'}}) { foreach my $tid (@{$self->{'_threads'}}) {
my $thr = threads->object($tid); my $thr = threads->object($tid);
next unless defined ($thr); next unless defined ($thr);
$thr->detach(); $thr->kill('KILL');
} }
} }

View File

@ -164,6 +164,7 @@ our @EXPORT = qw(
ui_get_full_url ui_get_full_url
p_encode_json p_encode_json
p_decode_json p_decode_json
get_server_name
); );
# ID of the different servers # ID of the different servers
@ -2146,7 +2147,12 @@ sub start_server_thread {
# Signal the threads to run. # Signal the threads to run.
$THRRUN = 1; $THRRUN = 1;
my $thr = threads->create($fn, @{$args}); my $thr = threads->create({'exit' => 'thread_only'}, sub {
local $SIG{'KILL'} = sub {
exit 0;
};
$fn->(@_)
}, @{$args});
push(@ServerThreads, $thr); push(@ServerThreads, $thr);
} }
@ -2173,7 +2179,7 @@ sub stop_server_threads {
$THRRUN = 0; $THRRUN = 0;
foreach my $thr (@ServerThreads) { foreach my $thr (@ServerThreads) {
$thr->join(); $thr->kill('KILL');
} }
@ServerThreads = (); @ServerThreads = ();
@ -2462,17 +2468,59 @@ sub p_decode_json {
my ($pa_config, $data) = @_; my ($pa_config, $data) = @_;
my $decoded_data; my $decoded_data;
if ($JSON::VERSION > 2.90) { eval {
# Initialize JSON manager. local $SIG{__DIE__};
my $json = JSON->new->utf8->allow_nonref; if ($JSON::VERSION > 2.90) {
$decoded_data = $json->decode($data); # Initialize JSON manager.
} else { my $json = JSON->new->utf8->allow_nonref;
$decoded_data = decode_json($data); $decoded_data = $json->decode($data);
} else {
$decoded_data = decode_json($data);
}
};
if ($@){
if (defined($data)) {
logger($pa_config, 'Failed to decode data: '.$@, 5);
}
} }
return $decoded_data; return $decoded_data;
} }
################################################################################
# String name for server type.
################################################################################
sub get_server_name {
my ($server_type) = @_;
if (!is_numeric($server_type)) {
return 'UNKNOWN';
}
return "DATASERVER" if ($server_type eq DATASERVER);
return "NETWORKSERVER" if ($server_type eq NETWORKSERVER);
return "SNMPCONSOLE" if ($server_type eq SNMPCONSOLE);
return "DISCOVERYSERVER" if ($server_type eq DISCOVERYSERVER);
return "PLUGINSERVER" if ($server_type eq PLUGINSERVER);
return "PREDICTIONSERVER" if ($server_type eq PREDICTIONSERVER);
return "WMISERVER" if ($server_type eq WMISERVER);
return "EXPORTSERVER" if ($server_type eq EXPORTSERVER);
return "INVENTORYSERVER" if ($server_type eq INVENTORYSERVER);
return "WEBSERVER" if ($server_type eq WEBSERVER);
return "EVENTSERVER" if ($server_type eq EVENTSERVER);
return "ICMPSERVER" if ($server_type eq ICMPSERVER);
return "SNMPSERVER" if ($server_type eq SNMPSERVER);
return "SATELLITESERVER" if ($server_type eq SATELLITESERVER);
return "TRANSACTIONALSERVER" if ($server_type eq TRANSACTIONALSERVER);
return "MFSERVER" if ($server_type eq MFSERVER);
return "SYNCSERVER" if ($server_type eq SYNCSERVER);
return "WUXSERVER" if ($server_type eq WUXSERVER);
return "SYSLOGSERVER" if ($server_type eq SYSLOGSERVER);
return "PROVISIONINGSERVER" if ($server_type eq PROVISIONINGSERVER);
return "MIGRATIONSERVER" if ($server_type eq MIGRATIONSERVER);
return "UNKNOWN";
}
1; 1;
__END__ __END__