From 380c7150e1165e9537b9fb6d66e07d82c36c768e Mon Sep 17 00:00:00 2001
From: Ramon Novoa <ramonnovoa@gmail.com>
Date: Tue, 20 Dec 2022 11:44:08 +0100
Subject: [PATCH 1/2] Add support for thread stats.

ProducerConsumerServer generates per-thread statistics that are
collected as part of the Pandora FMS's self-monitoring.
---
 pandora_server/bin/pandora_server             |  10 +
 pandora_server/lib/PandoraFMS/Core.pm         |  64 +++++++
 .../lib/PandoraFMS/ProducerConsumerServer.pm  | 171 +++++++++++-------
 pandora_server/lib/PandoraFMS/Server.pm       |  68 ++++++-
 4 files changed, 250 insertions(+), 63 deletions(-)

diff --git a/pandora_server/bin/pandora_server b/pandora_server/bin/pandora_server
index e6ec7ed6dc..45800de908 100755
--- a/pandora_server/bin/pandora_server
+++ b/pandora_server/bin/pandora_server
@@ -855,6 +855,7 @@ sub main() {
 	
 	# Main loop
 	my $time_ref = time ();
+	my $thr_time_ref = 0;
 	my $test_remote_interval = ($Config{'keepalive'}/$Config{'server_threshold'});
 	my $test_remote = 0;
 	while ($RUN == 1) {
@@ -870,6 +871,15 @@ sub main() {
 			# Make sure all server threads are running.
 			die("Server thread crashed.") unless (check_server_threads() == 1);
 
+			# Monitor server threads.
+			if (defined($Config{"self_monitoring"}) 
+				&& $Config{"self_monitoring"} == 1
+				&& !is_metaconsole(\%Config)
+				&& time() - $thr_time_ref > $Config{'self_monitoring_interval'}) {
+				$thr_time_ref = time();
+				pandora_thread_monitoring (\%Config, $DBH, \@Servers);
+			}
+
 			db_do ($DBH,
 				"UPDATE tserver SET status = -1
 				WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(keepalive) > 2*server_keepalive
diff --git a/pandora_server/lib/PandoraFMS/Core.pm b/pandora_server/lib/PandoraFMS/Core.pm
index e4bf6eeaae..18db0d3b64 100644
--- a/pandora_server/lib/PandoraFMS/Core.pm
+++ b/pandora_server/lib/PandoraFMS/Core.pm
@@ -98,6 +98,8 @@ Exported Functions:
 
 =item * C<pandora_self_monitoring>
 
+=item * C<pandora_thread_monitoring>
+
 =item * C<pandora_sample_agent>
 
 =back
@@ -258,6 +260,7 @@ our @EXPORT = qw(
 	pandora_group_statistics
 	pandora_server_statistics
 	pandora_self_monitoring
+	pandora_thread_monitoring
 	pandora_sample_agent
 	pandora_process_policy_queue
 	pandora_sync_agents_integria
@@ -6064,6 +6067,67 @@ sub pandora_self_monitoring ($$) {
 	print XMLFILE $xml_output;
 	close (XMLFILE);
 }
+
+##########################################################################
+=head2 C<< pandora_thread_monitoring (I<$pa_config>, I<$dbh>, I<$servers>) >>
+
+Generate stats for Pandora FMS threads.
+
+=cut
+##########################################################################
+
+sub pandora_thread_monitoring ($$$) {
+	my ($pa_config, $dbh, $servers) = @_;
+	my $utimestamp = time ();
+	my $timestamp = strftime ("%Y-%m-%d %H:%M:%S", localtime());
+
+	my $xml_output = "";
+	
+	$xml_output = "<agent_data os_name='$OS' os_version='$OS_VERSION' version='" . $pa_config->{'version'} . "' description='" . $pa_config->{'rb_product_name'} . " Server version " . $pa_config->{'version'} . "' agent_name='".$pa_config->{'servername'} . "' agent_alias='".$pa_config->{'servername'} . "' interval='".$pa_config->{"self_monitoring_interval"}."' timestamp='".$timestamp."' >";
+	foreach my $server (@{$servers}) {
+		while (my ($tid, $stats) = each(%{$server->getProducerStats()})) {
+			$xml_output .=" <module>";
+			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Status</name>";
+			$xml_output .=" <type>generic_proc</type>";
+			$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
+			$xml_output .=" </module>";
+	
+			$xml_output .=" <module>";
+			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Processing Rate</name>";
+			$xml_output .=" <type>generic_data</type>";
+			$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
+			$xml_output .=" <unit>tasks/second</unit>";
+			$xml_output .=" </module>";
+		}
+
+		my $idx = 0;
+		my $consumer_stats = $server->getConsumerStats();
+		foreach my $tid (sort(keys(%{$consumer_stats}))) {
+			my $stats = $consumer_stats->{$tid};
+
+			$idx += 1;
+			$xml_output .=" <module>";
+			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Status</name>";
+			$xml_output .=" <type>generic_proc</type>";
+			$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
+			$xml_output .=" </module>";
+	
+			$xml_output .=" <module>";
+			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Processing Rate</name>";
+			$xml_output .=" <type>generic_data</type>";
+			$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
+			$xml_output .=" <unit>tasks/second</unit>";
+			$xml_output .=" </module>";
+		}
+	}
+	$xml_output .= "</agent_data>";
+
+	my $filename = $pa_config->{"incomingdir"}."/".$pa_config->{'servername'}.".threads.".$utimestamp.".data";
+	open (XMLFILE, ">", $filename) or die "[FATAL] Could not write to the thread monitoring XML file '$filename'";
+	print XMLFILE $xml_output;
+	close (XMLFILE);
+}
+
 ##########################################################################
 =head2 C<< xml_module_template (I<$module_name>, I<$module_type>, I<$module_data>) >>
 
diff --git a/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm b/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm
index 11026ae0b2..770bc06522 100644
--- a/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm
+++ b/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm
@@ -89,6 +89,15 @@ sub run ($$$$$) {
 
 	# Launch consumer threads
 	for (1..$self->getNumThreads ()) {
+
+		# Enable consumer stats
+		my $consumer_stats = shared_clone({
+			'tstamp'      => time(),
+			'rate'        => 0,
+			'rate_count'  => 0,
+			'rate_tstamp' => time()
+		});
+
 		my $thr = threads->create ({'exit' => 'thread_only'},
 			sub {
 				my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
@@ -98,13 +107,29 @@ sub run ($$$$$) {
 					$sem->up();
 					exit 0;
 				};
+
+				# Make consumer stats reachable from the thread
+				$self->{'_consumer_stats'}->{threads->tid()} = $consumer_stats;
+
 				PandoraFMS::ProducerConsumerServer::data_consumer->(@_);
 			}, $self, $task_queue, $pending_tasks, $sem, $task_sem
 		);
 		return unless defined ($thr);
 		$self->addThread ($thr->tid ());
+
+		# Make consumer stats reachable from the main program
+		$self->{'_consumer_stats'}->{$thr->tid()} = $consumer_stats;
 	}
 
+
+	# Enable producer stats
+	my $producer_stats = shared_clone({
+		'tstamp'      => time(),
+		'rate'        => 0,
+		'rate_count'  => 0,
+		'rate_tstamp' => time()
+	});
+
 	# Launch producer thread
 	my $thr = threads->create ({'exit' => 'thread_only'},
 		sub {
@@ -115,11 +140,18 @@ sub run ($$$$$) {
 				$sem->up();
 				exit 0;
 			};
+
+			# Make producer stats reachable from the thread
+			$self->{'_producer_stats'}->{threads->tid()} = $producer_stats;
+
 			PandoraFMS::ProducerConsumerServer::data_producer->(@_);
 		}, $self, $task_queue, $pending_tasks, $sem, $task_sem
 	);
 	return unless defined ($thr);
 	$self->addThread ($thr->tid ());
+
+	# Make producer stats reachable from the main program
+	$self->{'_producer_stats'}->{$thr->tid()} = $producer_stats;
 }
 
 ###############################################################################
@@ -130,46 +162,50 @@ sub data_producer ($$$$$) {
 	my $pa_config = $self->getConfig ();
 	my $dbh;
 
-	eval {
-		# Connect to the DB
-		$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
-							  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
-		$self->setDBH ($dbh);
+	while ($RUN == 1) {
+		eval {
+			# Connect to the DB
+			$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
+								  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
+			$self->setDBH ($dbh);
 
-		while ($RUN == 1) {
+			while ($RUN == 1) {
 
-			# Get pending tasks
-			$self->logThread('[PRODUCER] Queuing tasks.');
-			my @tasks = &{$self->{'_producer'}}($self);
-			
-			foreach my $task (@tasks) {
-				$sem->down;
+				# Get pending tasks
+				$self->logThread('[PRODUCER] Queuing tasks.');
+				my @tasks = &{$self->{'_producer'}}($self);
 				
-				last if ($RUN == 0);
-				if (defined $pending_tasks->{$task}) {
-					$sem->up;
-					next;
-				}
+				foreach my $task (@tasks) {
+					$sem->down;
 					
-				# Queue task and signal consumers
-				$pending_tasks->{$task} = 0;
-				push (@{$task_queue}, $task);
-				$task_sem->up;
-				
-				$sem->up;
+					last if ($RUN == 0);
+					if (defined $pending_tasks->{$task}) {
+						$sem->up;
+						next;
+					}
+						
+					# Queue task and signal consumers
+					$pending_tasks->{$task} = 0;
+					push (@{$task_queue}, $task);
+					$task_sem->up;
+					
+					$sem->up;
+				}
+
+				last if ($RUN == 0);
+
+				# Update queue size and thread stats
+				$self->setQueueSize (scalar @{$task_queue});
+				$self->updateProducerStats(scalar(@tasks));
+
+				threads->yield;
+				usleep (int(1e6 * $self->getPeriod()));
 			}
-
-			last if ($RUN == 0);
-			# Update queue size for statistics
-			$self->setQueueSize (scalar @{$task_queue});
-
-			threads->yield;
-			usleep (int(1e6 * $self->getPeriod()));
+		};
+		
+		if ($@) {
+			print STDERR $@;
 		}
-	};
-	
-	if ($@) {
-		$self->setErrStr ($@);
 	}
 	
 	$task_sem->up($self->getNumThreads ());
@@ -185,40 +221,51 @@ sub data_consumer ($$$$$) {
 	my $pa_config = $self->getConfig ();
 
 	my $dbh;
-	eval {
-		# Connect to the DB
-		$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
-							  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
-		$self->setDBH ($dbh);
+	my $sem_timeout = $pa_config->{'self_monitoring_interval'} > 0 ?
+	                  $pa_config->{'self_monitoring_interval'} :
+					  300;
+	while ($RUN == 1) {
+		eval {
+			# Connect to the DB
+			$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
+								  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
+			$self->setDBH ($dbh);
 
-		while ($RUN == 1) {
-			# Wait for data
-			$self->logThread('[CONSUMER] Waiting for data.');
-			$task_sem->down;
+			while ($RUN == 1) {
+				# Wait for data
+				$self->logThread('[CONSUMER] Waiting for data.');
+				while (!$task_sem->down_timed($sem_timeout)) {
+					$self->updateConsumerStats(0);
+				}
 
-			$sem->down;
-			last if ($RUN == 0);
-			my $task = shift (@{$task_queue});
-			$sem->up;
+				last if ($RUN == 0);
 
-			# The consumer was waiting for data when the producer exited
-			last if ($RUN == 0);
-			
-			# Execute task
-			$self->logThread("[CONSUMER] Executing task: $task");
-			&{$self->{'_consumer'}}($self, $task);
+				$sem->down;
+				my $task = shift (@{$task_queue});
+				$sem->up;
 
-			# Update task status
-			$sem->down;
-			delete ($pending_tasks->{$task});
-			$sem->up;
+				# The consumer was waiting for data when the producer exited
+				last if ($RUN == 0);
+				
+				# Execute task
+				$self->logThread("[CONSUMER] Executing task: $task");
+				&{$self->{'_consumer'}}($self, $task);
 
-			threads->yield;
+				# Update thread stats
+				$self->updateConsumerStats(1);
+
+				# Update task status
+				$sem->down;
+				delete ($pending_tasks->{$task});
+				$sem->up;
+
+				threads->yield;
+			}
+		};
+
+		if ($@) {
+			print STDERR $@;
 		}
-	};
-
-	if ($@) {
-		$self->setErrStr ($@);
 	}
 
 	db_disconnect ($dbh);
diff --git a/pandora_server/lib/PandoraFMS/Server.pm b/pandora_server/lib/PandoraFMS/Server.pm
index 41a07b1418..9009c2bbf3 100644
--- a/pandora_server/lib/PandoraFMS/Server.pm
+++ b/pandora_server/lib/PandoraFMS/Server.pm
@@ -47,7 +47,9 @@ sub new ($$$;$) {
 		_threads => [],
 		_queue_size => 0,
 		_errstr => '',
-		_period => 0
+		_period => 0,
+		_producer_stats => {},
+		_consumer_stats => {},
 	};
 	
 	# Share variables that may be set from different threads
@@ -176,6 +178,24 @@ sub getServerType ($) {
 	return $self->{'_server_type'};
 }
 
+########################################################################################
+# Return consumer stats.
+########################################################################################
+sub getConsumerStats ($) {
+	my $self = shift;
+	
+	return $self->{'_consumer_stats'};
+}
+
+########################################################################################
+# Return producer stats.
+########################################################################################
+sub getProducerStats ($) {
+	my $self = shift;
+	
+	return $self->{'_producer_stats'};
+}
+
 ########################################################################################
 # Set error string.
 ########################################################################################
@@ -337,6 +357,52 @@ sub stop ($) {
 	}
 }
 
+########################################################################################
+# Update stats for the current thread.
+########################################################################################
+sub updateStats ($$$) {
+	my ($self, $dest, $inc) = @_;
+	my $tid = threads->tid();
+	my $curr_time = time();
+
+	# Stats disabled for this thread.
+	if (!defined($dest->{$tid})) {
+		return;
+	}
+
+	# Update the timestamp and count.
+	$dest->{$tid}->{'tstamp'} = time();
+	$dest->{$tid}->{'rate_count'} += $inc;
+
+	# Compute the processing rate.
+	my $elapsed = $curr_time - $dest->{$tid}->{'rate_tstamp'};
+	if ($elapsed >= $self->{'_pa_config'}->{'self_monitoring_interval'}) {
+		$dest->{$tid}->{'rate'} = $dest->{$tid}->{'rate_count'} / $elapsed;
+		$dest->{$tid}->{'rate_count'} = 0;
+		$dest->{$tid}->{'rate_tstamp'} = $curr_time;
+		return;
+	}
+}
+
+
+########################################################################################
+# Update producer stats.
+########################################################################################
+sub updateProducerStats ($$) {
+	my ($self, $queued_tasks) = @_;
+
+	$self->updateStats($self->{'_producer_stats'}, $queued_tasks);
+}
+
+########################################################################################
+# Update consumer stats.
+########################################################################################
+sub updateConsumerStats ($$) {
+	my ($self, $processed_tasks) = @_;
+
+	$self->updateStats($self->{'_consumer_stats'}, $processed_tasks);
+}
+
 # End of function declaration
 # End of defined Code
 

From 47fba1d6f41c4d1acc9c70ce5123c988363cee4e Mon Sep 17 00:00:00 2001
From: Ramon Novoa <ramon.novoa@pandorafms.com>
Date: Wed, 21 Dec 2022 18:15:43 +0100
Subject: [PATCH 2/2] Add thread-monitoring modules to specific module groups.

---
 pandora_server/lib/PandoraFMS/Core.pm | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pandora_server/lib/PandoraFMS/Core.pm b/pandora_server/lib/PandoraFMS/Core.pm
index 18db0d3b64..5fb8bf8ccb 100644
--- a/pandora_server/lib/PandoraFMS/Core.pm
+++ b/pandora_server/lib/PandoraFMS/Core.pm
@@ -6089,12 +6089,14 @@ sub pandora_thread_monitoring ($$$) {
 			$xml_output .=" <module>";
 			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Status</name>";
 			$xml_output .=" <type>generic_proc</type>";
+			$xml_output .=" <module_group>System</module_group>";
 			$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
 			$xml_output .=" </module>";
 	
 			$xml_output .=" <module>";
 			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Producer Processing Rate</name>";
 			$xml_output .=" <type>generic_data</type>";
+			$xml_output .=" <module_group>Performance</module_group>";
 			$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
 			$xml_output .=" <unit>tasks/second</unit>";
 			$xml_output .=" </module>";
@@ -6109,12 +6111,14 @@ sub pandora_thread_monitoring ($$$) {
 			$xml_output .=" <module>";
 			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Status</name>";
 			$xml_output .=" <type>generic_proc</type>";
+			$xml_output .=" <module_group>System</module_group>";
 			$xml_output .=" <data>" . (time() - $stats->{'tstamp'} < 2 * $pa_config->{"self_monitoring_interval"} ? 1 : 0) . "</data>";
 			$xml_output .=" </module>";
 	
 			$xml_output .=" <module>";
 			$xml_output .=" <name>" . uc($ServerTypes[$server->{'_server_type'}]) . " Consumer #$idx Processing Rate</name>";
 			$xml_output .=" <type>generic_data</type>";
+			$xml_output .=" <module_group>Performance</module_group>";
 			$xml_output .=" <data>" . $stats->{'rate'} . "</data>";
 			$xml_output .=" <unit>tasks/second</unit>";
 			$xml_output .=" </module>";