pandorafms/pandora_server/lib/PandoraFMS/ProducerConsumerServer.pm

352 lines
9.4 KiB
Perl
Raw Normal View History

package PandoraFMS::ProducerConsumerServer;
##########################################################################
# Pandora FMS generic server.
# Pandora FMS. the Flexible Monitoring System. http://www.pandorafms.org
##########################################################################
2023-07-03 17:20:25 +02:00
# Copyright (c) 2005-2023 Pandora FMS
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; version 2
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##########################################################################
use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Semaphore;
use Time::HiRes qw(usleep);
# Default lib dir for RPM and DEB packages
BEGIN { push @INC, '/usr/lib/perl5'; }
use PandoraFMS::DB;
use PandoraFMS::Core;
use PandoraFMS::Server;
2024-01-22 10:28:07 +01:00
use PandoraFMS::Tools;
# Inherits from PandoraFMS::Server
our @ISA = qw(PandoraFMS::Server);
# Tells the producer and consumers to keep running
my $RUN :shared;
########################################################################################
# ProducerConsumerServer class constructor.
########################################################################################
sub new ($$$$$;$) {
my ($class, $config, $server_type, $producer,
$consumer, $dbh) = @_;
# Call the constructor of the parent class
my $self = $class->SUPER::new($config, $server_type, $dbh);
2024-01-22 10:28:07 +01:00
# Set producer/consumer functions and wrappers
$self->{'_producer_wrapper'} = \&PandoraFMS::ProducerConsumerServer::data_producer;
$self->{'_consumer_wrapper'} = \&PandoraFMS::ProducerConsumerServer::data_consumer;
$self->{'_producer'} = $producer;
$self->{'_consumer'} = $consumer;
2024-01-22 10:28:07 +01:00
# Configure forking
2024-01-24 11:01:21 +01:00
$self->{'_fork'} = $config->{'multiprocess'} == 1 ? 1 : 0;
2024-01-22 10:28:07 +01:00
$self->{'_child_pid'} = undef;
# Run!
$RUN = 1;
bless $self, $class;
return $self;
}
########################################################################################
# Get producer function.
########################################################################################
sub getProducer ($) {
my $self = shift;
return $self->{'_producer'};
}
########################################################################################
# Get consumer function.
########################################################################################
sub getConsumer ($) {
my $self = shift;
return $self->{'_consumer'};
}
2024-01-22 10:28:07 +01:00
########################################################################################
# Enable forking.
########################################################################################
sub setFork ($) {
my $self = shift;
$self->{'_fork'} = 1;
}
###############################################################################
# Run.
###############################################################################
sub run ($$$$$) {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
# Update server status and set server ID
$self->update ();
$self->setServerID ();
2024-01-22 10:28:07 +01:00
# Run the server in a new process.
if ($self->{'_fork'} == 1) {
# Ignore SIGCHLD.
2024-01-22 10:28:07 +01:00
$SIG{CHLD} = 'IGNORE';
# Fork!
2024-01-22 10:28:07 +01:00
$self->{'_child_pid'} = fork();
die($!) unless defined($self->{'_child_pid'});
}
# The parent should exit.
# The child will start the producer/consumer threads.
if (defined($self->{'_child_pid'})) {
if ($self->{'_child_pid'} != 0) {
return;
} else {
# Restore the SIGCHLD handler.
$SIG{CHLD} = 'DEFAULT';
2024-01-22 10:28:07 +01:00
# Rename the process to prevent conflicts.
my $suffix = lc(get_server_name($self->getServerType()));
$0 =~ s/pandora_server/pandora_$suffix/;
# Clone the DB handle to prevent errors.
$self->{'_dbh'} = $self->{'_dbh'}->clone();
}
}
# Launch consumer threads
for (1..$self->getNumThreads ()) {
# Enable consumer stats
my $consumer_stats = shared_clone({
'tstamp' => time(),
'rate' => 0,
'rate_count' => 0,
2023-06-29 17:06:59 +02:00
'rate_tstamp' => time(),
'task_queue' => $task_queue,
});
2021-05-14 12:43:18 +02:00
my $thr = threads->create ({'exit' => 'thread_only'},
sub {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
local $SIG{'KILL'} = sub {
$RUN = 0;
$task_sem->up();
$sem->up();
exit 0;
};
# Make consumer stats reachable from the thread
$self->{'_consumer_stats'}->{threads->tid()} = $consumer_stats;
2024-01-22 10:28:07 +01:00
$self->{'_consumer_wrapper'}->(@_);
2021-05-14 12:43:18 +02:00
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
2024-01-22 10:28:07 +01:00
return unless defined ($thr);
$self->addThread ($thr->tid ());
# Make consumer stats reachable from the main program
$self->{'_consumer_stats'}->{$thr->tid()} = $consumer_stats;
}
# Enable producer stats
my $producer_stats = shared_clone({
'tstamp' => time(),
'rate' => 0,
'rate_count' => 0,
2023-06-29 17:06:59 +02:00
'rate_tstamp' => time(),
'task_queue' => $task_queue,
});
2024-01-22 10:28:07 +01:00
# When fork is enabled, the child runs the producer in a loop.
if (defined($self->{'_child_pid'}) && $self->{'_child_pid'} == 0) {
2021-05-14 12:43:18 +02:00
local $SIG{'KILL'} = sub {
$RUN = 0;
$task_sem->up();
$sem->up();
exit 0;
};
2024-01-22 10:28:07 +01:00
$self->{'_producer_wrapper'}->($self, $task_queue, $pending_tasks, $sem, $task_sem);
exit 0;
}
# Launch producer thread
else {
my $thr = threads->create ({'exit' => 'thread_only'},
sub {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
local $SIG{'KILL'} = sub {
$RUN = 0;
$task_sem->up();
$sem->up();
exit 0;
};
2024-01-22 10:28:07 +01:00
# Make producer stats reachable from the thread
$self->{'_producer_stats'}->{threads->tid()} = $producer_stats;
2024-01-22 10:28:07 +01:00
$self->{'_producer_wrapper'}->(@_);
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
);
return unless defined ($thr);
$self->addThread ($thr->tid ());
# Make producer stats reachable from the main program
$self->{'_producer_stats'}->{$thr->tid()} = $producer_stats;
}
}
###############################################################################
# Queue pending tasks.
###############################################################################
sub data_producer ($$$$$) {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
my $pa_config = $self->getConfig ();
my $dbh;
while ($RUN == 1) {
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
while ($RUN == 1) {
# Get pending tasks
$self->logThread('[PRODUCER] Queuing tasks.');
my @tasks = &{$self->{'_producer'}}($self);
foreach my $task (@tasks) {
$sem->down;
last if ($RUN == 0);
if (defined $pending_tasks->{$task}) {
$sem->up;
next;
}
2024-01-22 10:28:07 +01:00
# Queue task and signal consumers
$pending_tasks->{$task} = 0;
push (@{$task_queue}, $task);
$task_sem->up;
$sem->up;
}
last if ($RUN == 0);
# Update queue size and thread stats
$self->setQueueSize (scalar @{$task_queue});
$self->updateProducerStats(scalar(@tasks));
threads->yield;
usleep (int(1e6 * $self->getPeriod()));
}
};
if ($@) {
print STDERR $@;
}
}
$task_sem->up($self->getNumThreads ());
db_disconnect ($dbh);
2021-05-14 12:43:18 +02:00
exit 0;
}
###############################################################################
# Execute pending tasks.
###############################################################################
sub data_consumer ($$$$$) {
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
my $pa_config = $self->getConfig ();
my $dbh;
my $sem_timeout = $pa_config->{'self_monitoring_interval'} > 0 ?
$pa_config->{'self_monitoring_interval'} :
300;
while ($RUN == 1) {
eval {
# Connect to the DB
$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
$pa_config->{'dbuser'}, $pa_config->{'dbpass'});
$self->setDBH ($dbh);
while ($RUN == 1) {
# Wait for data
$self->logThread('[CONSUMER] Waiting for data.');
while (!$task_sem->down_timed($sem_timeout)) {
$self->updateConsumerStats(0);
}
last if ($RUN == 0);
$sem->down;
my $task = shift (@{$task_queue});
$sem->up;
# The consumer was waiting for data when the producer exited
last if ($RUN == 0);
# Execute task
$self->logThread("[CONSUMER] Executing task: $task");
&{$self->{'_consumer'}}($self, $task);
# Update thread stats
$self->updateConsumerStats(1);
# Update task status
$sem->down;
delete ($pending_tasks->{$task});
$sem->up;
threads->yield;
}
};
if ($@) {
print STDERR $@;
}
}
db_disconnect ($dbh);
2021-05-14 12:43:18 +02:00
exit 0;
}
###############################################################################
# Clean-up when the server is destroyed.
###############################################################################
sub DESTROY {
my $self = shift;
2024-01-22 10:28:07 +01:00
if (defined($self->{'_child_pid'}) && $self->{'_child_pid'} != 0) {
kill(9, $self->{'_child_pid'});
}
$RUN = 0;
}
1;
__END__