Add support for thread stats.

ProducerConsumerServer generates per-thread statistics that are
collected as part of the Pandora FMS's self-monitoring.
This commit is contained in:
Ramon Novoa 2022-12-20 11:44:08 +01:00
parent a600e540bf
commit 380c7150e1
4 changed files with 250 additions and 63 deletions

View File

@ -855,6 +855,7 @@ sub main() {
# Main loop
my $time_ref = time ();
my $thr_time_ref = 0;
my $test_remote_interval = ($Config{'keepalive'}/$Config{'server_threshold'});
my $test_remote = 0;
while ($RUN == 1) {
@ -870,6 +871,15 @@ sub main() {
# Make sure all server threads are running.
die("Server thread crashed.") unless (check_server_threads() == 1);
# Monitor server threads.
if (defined($Config{"self_monitoring"})
&& $Config{"self_monitoring"} == 1
&& !is_metaconsole(\%Config)
&& time() - $thr_time_ref > $Config{'self_monitoring_interval'}) {
$thr_time_ref = time();
pandora_thread_monitoring (\%Config, $DBH, \@Servers);
}
db_do ($DBH,
"UPDATE tserver SET status = -1
WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(keepalive) > 2*server_keepalive

View File

@ -98,6 +98,8 @@ Exported Functions:
=item * C<pandora_self_monitoring>
=item * C<pandora_thread_monitoring>
=item * C<pandora_sample_agent>
=back
@ -258,6 +260,7 @@ our @EXPORT = qw(
pandora_group_statistics
pandora_server_statistics
pandora_self_monitoring
pandora_thread_monitoring
pandora_sample_agent
pandora_process_policy_queue
pandora_sync_agents_integria
@ -6064,6 +6067,67 @@ sub pandora_self_monitoring ($$) {
print XMLFILE $xml_output;
close (XMLFILE);
}
##########################################################################
=head2 C<< pandora_thread_monitoring (I<$pa_config>, I<$dbh>, I<$servers>) >>
Generate stats for Pandora FMS threads.
=cut
##########################################################################
sub pandora_thread_monitoring ($$$) {
my ($pa_config, $dbh, $servers) = @_;
my $utimestamp = time ();
my $timestamp = strftime ("%Y-%m-%d %H:%M:%S", localtime());
my $xml_output = "";
$xml_output = "<agent_data os_name='$OS' os_version='$OS_VERSION' version='" . $pa_config->{'version'} . "' description='" . $pa_config->{'rb_product_name'} . " Server version " . $pa_config->{'version'} . "' agent_name='".$pa_config->{'servername'} . "' agent_alias='".$pa_config->{'servername'} . "' interval='".$pa_config->{"self_monitoring_interval"}."' timestamp='".$timestamp."' >";
foreach my $server (@{$servers}) {
while (my ($tid, $stats) = each(%{$server->getProducerStats()})) {
$xml_output .=" <module>";
$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Status</name>";
$xml_output .=" <type>generic_proc</type>";
$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
$xml_output .=" </module>";
$xml_output .=" <module>";
$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Processing Rate</name>";
$xml_output .=" <type>generic_data</type>";
$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
$xml_output .=" <unit>tasks/second</unit>";
$xml_output .=" </module>";
}
my $idx = 0;
my $consumer_stats = $server->getConsumerStats();
foreach my $tid (sort(keys(%{$consumer_stats}))) {
my $stats = $consumer_stats->{$tid};
$idx += 1;
$xml_output .=" <module>";
$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Status</name>";
$xml_output .=" <type>generic_proc</type>";
$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
$xml_output .=" </module>";
$xml_output .=" <module>";
$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Processing Rate</name>";
$xml_output .=" <type>generic_data</type>";
$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
$xml_output .=" <unit>tasks/second</unit>";
$xml_output .=" </module>";
}
}
$xml_output .= "</agent_data>";
my $filename = $pa_config->{"incomingdir"}."/".$pa_config->{'servername'}.".threads.".$utimestamp.".data";
open (XMLFILE, ">", $filename) or die "[FATAL] Could not write to the thread monitoring XML file '$filename'";
print XMLFILE $xml_output;
close (XMLFILE);
}
##########################################################################
=head2 C<< xml_module_template (I<$module_name>, I<$module_type>, I<$module_data>) >>

View File

@ -89,6 +89,15 @@ sub run ($$$$$) {
# Launch consumer threads
for (1..$self->getNumThreads ()) {
# Enable consumer stats
my $consumer_stats = shared_clone({
'tstamp' => time(),
'rate' => 0,
'rate_count' => 0,
'rate_tstamp' => time()
});
my $thr = threads->create ({'exit' => 'thread_only'},
sub {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
@ -98,13 +107,29 @@ sub run ($$$$$) {
$sem->up();
exit 0;
};
# Make consumer stats reachable from the thread
$self->{'_consumer_stats'}->{threads->tid()} = $consumer_stats;
PandoraFMS::ProducerConsumerServer::data_consumer->(@_);
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
return unless defined ($thr);
$self->addThread ($thr->tid ());
# Make consumer stats reachable from the main program
$self->{'_consumer_stats'}->{$thr->tid()} = $consumer_stats;
}
# Enable producer stats
my $producer_stats = shared_clone({
'tstamp' => time(),
'rate' => 0,
'rate_count' => 0,
'rate_tstamp' => time()
});
# Launch producer thread
my $thr = threads->create ({'exit' => 'thread_only'},
sub {
@ -115,11 +140,18 @@ sub run ($$$$$) {
$sem->up();
exit 0;
};
# Make producer stats reachable from the thread
$self->{'_producer_stats'}->{threads->tid()} = $producer_stats;
PandoraFMS::ProducerConsumerServer::data_producer->(@_);
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
return unless defined ($thr);
$self->addThread ($thr->tid ());
# Make producer stats reachable from the main program
$self->{'_producer_stats'}->{$thr->tid()} = $producer_stats;
}
###############################################################################
@ -130,46 +162,50 @@ sub data_producer ($$$$$) {
my $pa_config = $self->getConfig ();
my $dbh;
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
while ($RUN == 1) {
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
while ($RUN == 1) {
while ($RUN == 1) {
# Get pending tasks
$self->logThread('[PRODUCER] Queuing tasks.');
my @tasks = &{$self->{'_producer'}}($self);
foreach my $task (@tasks) {
$sem->down;
# Get pending tasks
$self->logThread('[PRODUCER] Queuing tasks.');
my @tasks = &{$self->{'_producer'}}($self);
last if ($RUN == 0);
if (defined $pending_tasks->{$task}) {
$sem->up;
next;
}
foreach my $task (@tasks) {
$sem->down;
# Queue task and signal consumers
$pending_tasks->{$task} = 0;
push (@{$task_queue}, $task);
$task_sem->up;
$sem->up;
last if ($RUN == 0);
if (defined $pending_tasks->{$task}) {
$sem->up;
next;
}
# Queue task and signal consumers
$pending_tasks->{$task} = 0;
push (@{$task_queue}, $task);
$task_sem->up;
$sem->up;
}
last if ($RUN == 0);
# Update queue size and thread stats
$self->setQueueSize (scalar @{$task_queue});
$self->updateProducerStats(scalar(@tasks));
threads->yield;
usleep (int(1e6 * $self->getPeriod()));
}
last if ($RUN == 0);
# Update queue size for statistics
$self->setQueueSize (scalar @{$task_queue});
threads->yield;
usleep (int(1e6 * $self->getPeriod()));
};
if ($@) {
print STDERR $@;
}
};
if ($@) {
$self->setErrStr ($@);
}
$task_sem->up($self->getNumThreads ());
@ -185,40 +221,51 @@ sub data_consumer ($$$$$) {
my $pa_config = $self->getConfig ();
my $dbh;
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
my $sem_timeout = $pa_config->{'self_monitoring_interval'} > 0 ?
$pa_config->{'self_monitoring_interval'} :
300;
while ($RUN == 1) {
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
while ($RUN == 1) {
# Wait for data
$self->logThread('[CONSUMER] Waiting for data.');
$task_sem->down;
while ($RUN == 1) {
# Wait for data
$self->logThread('[CONSUMER] Waiting for data.');
while (!$task_sem->down_timed($sem_timeout)) {
$self->updateConsumerStats(0);
}
$sem->down;
last if ($RUN == 0);
my $task = shift (@{$task_queue});
$sem->up;
last if ($RUN == 0);
# The consumer was waiting for data when the producer exited
last if ($RUN == 0);
# Execute task
$self->logThread("[CONSUMER] Executing task: $task");
&{$self->{'_consumer'}}($self, $task);
$sem->down;
my $task = shift (@{$task_queue});
$sem->up;
# Update task status
$sem->down;
delete ($pending_tasks->{$task});
$sem->up;
# The consumer was waiting for data when the producer exited
last if ($RUN == 0);
# Execute task
$self->logThread("[CONSUMER] Executing task: $task");
&{$self->{'_consumer'}}($self, $task);
threads->yield;
# Update thread stats
$self->updateConsumerStats(1);
# Update task status
$sem->down;
delete ($pending_tasks->{$task});
$sem->up;
threads->yield;
}
};
if ($@) {
print STDERR $@;
}
};
if ($@) {
$self->setErrStr ($@);
}
db_disconnect ($dbh);

View File

@ -47,7 +47,9 @@ sub new ($$$;$) {
_threads => [],
_queue_size => 0,
_errstr => '',
_period => 0
_period => 0,
_producer_stats => {},
_consumer_stats => {},
};
# Share variables that may be set from different threads
@ -176,6 +178,24 @@ sub getServerType ($) {
return $self->{'_server_type'};
}
########################################################################################
# Return consumer stats.
########################################################################################
sub getConsumerStats ($) {
my $self = shift;
return $self->{'_consumer_stats'};
}
########################################################################################
# Return producer stats.
########################################################################################
sub getProducerStats ($) {
my $self = shift;
return $self->{'_producer_stats'};
}
########################################################################################
# Set error string.
########################################################################################
@ -337,6 +357,52 @@ sub stop ($) {
}
}
########################################################################################
# Update stats for the current thread.
########################################################################################
sub updateStats ($$$) {
my ($self, $dest, $inc) = @_;
my $tid = threads->tid();
my $curr_time = time();
# Stats disabled for this thread.
if (!defined($dest->{$tid})) {
return;
}
# Update the timestamp and count.
$dest->{$tid}->{'tstamp'} = time();
$dest->{$tid}->{'rate_count'} += $inc;
# Compute the processing rate.
my $elapsed = $curr_time - $dest->{$tid}->{'rate_tstamp'};
if ($elapsed >= $self->{'_pa_config'}->{'self_monitoring_interval'}) {
$dest->{$tid}->{'rate'} = $dest->{$tid}->{'rate_count'} / $elapsed;
$dest->{$tid}->{'rate_count'} = 0;
$dest->{$tid}->{'rate_tstamp'} = $curr_time;
return;
}
}
########################################################################################
# Update producer stats.
########################################################################################
sub updateProducerStats ($$) {
my ($self, $queued_tasks) = @_;
$self->updateStats($self->{'_producer_stats'}, $queued_tasks);
}
########################################################################################
# Update consumer stats.
########################################################################################
sub updateConsumerStats ($$) {
my ($self, $processed_tasks) = @_;
$self->updateStats($self->{'_consumer_stats'}, $processed_tasks);
}
# End of function declaration
# End of defined Code