package PandoraFMS::ProducerConsumerServer;
# Pandora FMS generic server.
# Pandora FMS. the Flexible Monitoring System.
# Copyright (c) 2005-2009 Artica Soluciones Tecnologicas S.L
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; version 2
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

use strict;
use warnings;

use threads;
use threads::shared;
use Thread::Semaphore;

# Default lib dir for RPM and DEB packages
use lib '/usr/lib/perl5';

use PandoraFMS::DB;
use PandoraFMS::Core;
use PandoraFMS::Server;

# Inherits from PandoraFMS::Server
our @ISA = qw(PandoraFMS::Server);

# Tells the producer and consumers to keep running
my $RUN :shared;

# ProducerConsumerServer class constructor.
sub new ($$$$$;$) {
	my ($class, $config, $server_type, $producer,
	    $consumer, $dbh) = @_;

	# Call the constructor of the parent class
	my $self = $class->SUPER::new($config, $server_type, $dbh);

	# Set producer/consumer functions
	$self->{'_producer'} = $producer;
	$self->{'_consumer'} = $consumer;

	# Run!
	$RUN = 1;
    bless $self, $class;
    return $self;

# Get producer function.
sub getProducer ($) {
	my $self = shift;
	return $self->{'_producer'};

# Get consumer function.
sub getConsumer ($) {
	my $self = shift;
	return $self->{'_consumer'};

# Run.
sub run ($$$$$) {
	my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;

	# Update server status and set server ID
	$self->update ();
	$self->setServerID ();

	# Launch consumer threads
	for (1..$self->getNumThreads ()) {
		my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_consumer, $self,
		              $task_queue, $pending_tasks, $sem, $task_sem);
		return unless defined ($thr);
		$self->addThread ($thr->tid ());

	# Launch producer thread
	my $thr = threads->create (\&PandoraFMS::ProducerConsumerServer::data_producer, $self,
	              $task_queue, $pending_tasks, $sem, $task_sem);
	return unless defined ($thr);
	$self->addThread ($thr->tid ());

# Queue pending tasks.
sub data_producer ($$$$$) {
	my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
	my $pa_config = $self->getConfig ();
	my $dbh;

	eval {
		# Connect to the DB
		$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
							  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
		$self->setDBH ($dbh);

		while ($RUN == 1) {

			# Get pending tasks
			my @tasks = &{$self->{'_producer'}}($self);
			foreach my $task (@tasks) {
				if (defined $pending_tasks->{$task}) {
				# Queue task and signal consumers
				$pending_tasks->{$task} = 0;
				push (@{$task_queue}, $task);

			# Update queue size for statistics
			$self->setQueueSize (scalar @{$task_queue});

			sleep ($pa_config->{'server_threshold'});
	if ($@) {
		$self->setErrStr ($@);
	$task_sem->up($self->getNumThreads ());
	db_disconnect ($dbh);

# Execute pending tasks.
sub data_consumer ($$$$$) {
	my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
	my $pa_config = $self->getConfig ();

	my $dbh;
	eval {
		# Connect to the DB
		$dbh = db_connect ($pa_config->{'dbengine'}, $pa_config->{'dbname'}, $pa_config->{'dbhost'}, $pa_config->{'dbport'},
							  $pa_config->{'dbuser'}, $pa_config->{'dbpass'});
		$self->setDBH ($dbh);

		while ($RUN == 1) {

			# Wait for data

			my $task = shift (@{$task_queue});

			# The consumer was waiting for data when the producer exited
			last if ($RUN == 0);
			# Execute task
			&{$self->{'_consumer'}}($self, $task);

			# Update task status
			delete ($pending_tasks->{$task});


	if ($@) {
		$self->setErrStr ($@);

	db_disconnect ($dbh);

# Clean-up when the server is destroyed.
	my $self = shift;
	$RUN = 0;
