Add support for forking servers.
This commit is contained in:
parent
c6a9dce7b2
commit
6d1c2b38fa
|
@ -132,18 +132,6 @@ sub pandora_startup () {
|
|||
# Grab config tokens shared with the console and not in the .conf
|
||||
pandora_get_sharedconfig (\%Config, $DBH);
|
||||
|
||||
# Kill any running server threads.
|
||||
stop_server_threads();
|
||||
|
||||
# Start the task execution thread.
|
||||
start_server_thread(\&pandora_server_tasks, [\%Config]);
|
||||
|
||||
# Start the policy queue thread.
|
||||
start_server_thread(\&pandora_process_policy_queue, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'policy_manager'} == 1);
|
||||
|
||||
# Start agent autoconfiguration thread.
|
||||
start_server_thread(\&pandora_agent_autoconfiguration_scheduled, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'autoconfigure_agents'} == 1);
|
||||
|
||||
pandora_audit (\%Config, $Config{'rb_product_name'} . ' Server Daemon starting', 'SYSTEM', 'System', $DBH);
|
||||
|
||||
# Load servers
|
||||
|
@ -168,6 +156,18 @@ sub pandora_startup () {
|
|||
# There are enterprise metaconsole servers!
|
||||
enterprise_hook('load_enterprise_servers', [\@Servers, \%Config, $DBH]);
|
||||
|
||||
# Kill any running server threads.
|
||||
stop_server_threads();
|
||||
|
||||
# Start the task execution thread.
|
||||
start_server_thread(\&pandora_server_tasks, [\%Config]);
|
||||
|
||||
# Start the policy queue thread.
|
||||
start_server_thread(\&pandora_process_policy_queue, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'policy_manager'} == 1);
|
||||
|
||||
# Start agent autoconfiguration thread.
|
||||
start_server_thread(\&pandora_agent_autoconfiguration_scheduled, [\%Config]) if ($Config{'__enterprise_enabled'} == 1 && $Config{'autoconfigure_agents'} == 1);
|
||||
|
||||
# Start the netflow daemon if necessary
|
||||
pandora_start_netflow_daemon ();
|
||||
|
||||
|
@ -806,7 +806,13 @@ sub main() {
|
|||
|
||||
# Update server status
|
||||
foreach my $server (@Servers) {
|
||||
die ($server->getErrStr ()) unless ($server->checkThreads () == 1);
|
||||
|
||||
# Check server threads.
|
||||
die ($server->getErrStr()) unless ($server->checkThreads() == 1);
|
||||
|
||||
# Check (and restart if needed) child processes.
|
||||
$server->run() if ($server->checkProc() == 0);
|
||||
|
||||
$server->update();
|
||||
}
|
||||
|
||||
|
|
|
@ -816,3 +816,6 @@ madeserver_autofit 7d
|
|||
# Model sensitivity. A lower value triggers less anomalies (PANDORA FMS ENTERPRISE ONLY).
|
||||
madeserver_sensitivity 0.1
|
||||
|
||||
# If set to 1, Pandora FMS servers will run in separate processes.
|
||||
fork 0
|
||||
|
||||
|
|
|
@ -589,6 +589,8 @@ sub pandora_load_config {
|
|||
|
||||
$pa_config->{"madeserver"} = 0; # 774.
|
||||
|
||||
$pa_config->{"fork"} = 0; # 775.
|
||||
|
||||
# Check for UID0
|
||||
if ($pa_config->{"quiet"} != 0){
|
||||
if ($> == 0){
|
||||
|
@ -1412,6 +1414,9 @@ sub pandora_load_config {
|
|||
elsif ($parametro =~ m/^madeserver\s+([0-1])/i){
|
||||
$pa_config->{'madeserver'}= clean_blank($1);
|
||||
}
|
||||
elsif ($parametro =~ m/^fork\s+([0-1])/i){
|
||||
$pa_config->{'fork'}= clean_blank($1);
|
||||
}
|
||||
} # end of loop for parameter #
|
||||
|
||||
# The DB host was overridden by pandora_ha.
|
||||
|
|
|
@ -156,7 +156,7 @@ sub db_connect ($$$$$$) {
|
|||
$RDBMS_QUOTE_STRING = '"';
|
||||
|
||||
# Connect to MySQL
|
||||
my $dbh = DBI->connect("DBI:mysql:$db_name:$db_host:$db_port;$SSL_OPTS", $db_user, $db_pass, { RaiseError => 1, AutoCommit => 1 });
|
||||
my $dbh = DBI->connect("DBI:mysql:$db_name:$db_host:$db_port;$SSL_OPTS", $db_user, $db_pass, { RaiseError => 1, AutoCommit => 1, AutoInactiveDestroy => 1 });
|
||||
return undef unless defined ($dbh);
|
||||
|
||||
# Enable auto reconnect
|
||||
|
|
|
@ -31,6 +31,7 @@ BEGIN { push @INC, '/usr/lib/perl5'; }
|
|||
use PandoraFMS::DB;
|
||||
use PandoraFMS::Core;
|
||||
use PandoraFMS::Server;
|
||||
use PandoraFMS::Tools;
|
||||
|
||||
# Inherits from PandoraFMS::Server
|
||||
our @ISA = qw(PandoraFMS::Server);
|
||||
|
@ -48,10 +49,16 @@ sub new ($$$$$;$) {
|
|||
# Call the constructor of the parent class
|
||||
my $self = $class->SUPER::new($config, $server_type, $dbh);
|
||||
|
||||
# Set producer/consumer functions
|
||||
# Set producer/consumer functions and wrappers
|
||||
$self->{'_producer_wrapper'} = \&PandoraFMS::ProducerConsumerServer::data_producer;
|
||||
$self->{'_consumer_wrapper'} = \&PandoraFMS::ProducerConsumerServer::data_consumer;
|
||||
$self->{'_producer'} = $producer;
|
||||
$self->{'_consumer'} = $consumer;
|
||||
|
||||
# Configure forking
|
||||
$self->{'_fork'} = $config->{'fork'} == 1 ? 1 : 0;
|
||||
$self->{'_child_pid'} = undef;
|
||||
|
||||
# Run!
|
||||
$RUN = 1;
|
||||
|
||||
|
@ -77,6 +84,15 @@ sub getConsumer ($) {
|
|||
return $self->{'_consumer'};
|
||||
}
|
||||
|
||||
########################################################################################
|
||||
# Enable forking.
|
||||
########################################################################################
|
||||
sub setFork ($) {
|
||||
my $self = shift;
|
||||
|
||||
$self->{'_fork'} = 1;
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
# Run.
|
||||
###############################################################################
|
||||
|
@ -87,6 +103,29 @@ sub run ($$$$$) {
|
|||
$self->update ();
|
||||
$self->setServerID ();
|
||||
|
||||
# Run the server in a new process.
|
||||
if ($self->{'_fork'} == 1) {
|
||||
$SIG{CHLD} = 'IGNORE';
|
||||
$self->{'_child_pid'} = fork();
|
||||
die($!) unless defined($self->{'_child_pid'});
|
||||
}
|
||||
|
||||
# The parent should exit.
|
||||
# The child will start the producer/consumer threads.
|
||||
if (defined($self->{'_child_pid'})) {
|
||||
if ($self->{'_child_pid'} != 0) {
|
||||
return;
|
||||
} else {
|
||||
# Rename the process to prevent conflicts.
|
||||
my $suffix = lc(get_server_name($self->getServerType()));
|
||||
$0 =~ s/pandora_server/pandora_$suffix/;
|
||||
|
||||
# Clone the DB handle to prevent errors.
|
||||
$self->{'_dbh'} = $self->{'_dbh'}->clone();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# Launch consumer threads
|
||||
for (1..$self->getNumThreads ()) {
|
||||
|
||||
|
@ -112,9 +151,10 @@ sub run ($$$$$) {
|
|||
# Make consumer stats reachable from the thread
|
||||
$self->{'_consumer_stats'}->{threads->tid()} = $consumer_stats;
|
||||
|
||||
PandoraFMS::ProducerConsumerServer::data_consumer->(@_);
|
||||
$self->{'_consumer_wrapper'}->(@_);
|
||||
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
|
||||
);
|
||||
|
||||
return unless defined ($thr);
|
||||
$self->addThread ($thr->tid ());
|
||||
|
||||
|
@ -122,7 +162,6 @@ sub run ($$$$$) {
|
|||
$self->{'_consumer_stats'}->{$thr->tid()} = $consumer_stats;
|
||||
}
|
||||
|
||||
|
||||
# Enable producer stats
|
||||
my $producer_stats = shared_clone({
|
||||
'tstamp' => time(),
|
||||
|
@ -132,10 +171,8 @@ sub run ($$$$$) {
|
|||
'task_queue' => $task_queue,
|
||||
});
|
||||
|
||||
# Launch producer thread
|
||||
my $thr = threads->create ({'exit' => 'thread_only'},
|
||||
sub {
|
||||
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
|
||||
# When fork is enabled, the child runs the producer in a loop.
|
||||
if (defined($self->{'_child_pid'}) && $self->{'_child_pid'} == 0) {
|
||||
local $SIG{'KILL'} = sub {
|
||||
$RUN = 0;
|
||||
$task_sem->up();
|
||||
|
@ -143,17 +180,33 @@ sub run ($$$$$) {
|
|||
exit 0;
|
||||
};
|
||||
|
||||
# Make producer stats reachable from the thread
|
||||
$self->{'_producer_stats'}->{threads->tid()} = $producer_stats;
|
||||
$self->{'_producer_wrapper'}->($self, $task_queue, $pending_tasks, $sem, $task_sem);
|
||||
exit 0;
|
||||
}
|
||||
# Launch producer thread
|
||||
else {
|
||||
my $thr = threads->create ({'exit' => 'thread_only'},
|
||||
sub {
|
||||
my ($self, $task_queue, $pending_tasks, $sem, $task_sem) = @_;
|
||||
local $SIG{'KILL'} = sub {
|
||||
$RUN = 0;
|
||||
$task_sem->up();
|
||||
$sem->up();
|
||||
exit 0;
|
||||
};
|
||||
|
||||
PandoraFMS::ProducerConsumerServer::data_producer->(@_);
|
||||
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
|
||||
);
|
||||
return unless defined ($thr);
|
||||
$self->addThread ($thr->tid ());
|
||||
# Make producer stats reachable from the thread
|
||||
$self->{'_producer_stats'}->{threads->tid()} = $producer_stats;
|
||||
|
||||
# Make producer stats reachable from the main program
|
||||
$self->{'_producer_stats'}->{$thr->tid()} = $producer_stats;
|
||||
$self->{'_producer_wrapper'}->(@_);
|
||||
}, $self, $task_queue, $pending_tasks, $sem, $task_sem
|
||||
);
|
||||
return unless defined ($thr);
|
||||
$self->addThread ($thr->tid ());
|
||||
|
||||
# Make producer stats reachable from the main program
|
||||
$self->{'_producer_stats'}->{$thr->tid()} = $producer_stats;
|
||||
}
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
|
@ -185,7 +238,7 @@ sub data_producer ($$$$$) {
|
|||
$sem->up;
|
||||
next;
|
||||
}
|
||||
|
||||
|
||||
# Queue task and signal consumers
|
||||
$pending_tasks->{$task} = 0;
|
||||
push (@{$task_queue}, $task);
|
||||
|
@ -209,7 +262,7 @@ sub data_producer ($$$$$) {
|
|||
print STDERR $@;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
$task_sem->up($self->getNumThreads ());
|
||||
db_disconnect ($dbh);
|
||||
exit 0;
|
||||
|
@ -226,6 +279,7 @@ sub data_consumer ($$$$$) {
|
|||
my $sem_timeout = $pa_config->{'self_monitoring_interval'} > 0 ?
|
||||
$pa_config->{'self_monitoring_interval'} :
|
||||
300;
|
||||
|
||||
while ($RUN == 1) {
|
||||
eval {
|
||||
# Connect to the DB
|
||||
|
@ -280,6 +334,10 @@ sub data_consumer ($$$$$) {
|
|||
sub DESTROY {
|
||||
my $self = shift;
|
||||
|
||||
if (defined($self->{'_child_pid'}) && $self->{'_child_pid'} != 0) {
|
||||
kill(9, $self->{'_child_pid'});
|
||||
}
|
||||
|
||||
$RUN = 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -270,6 +270,24 @@ sub checkThreads ($) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
########################################################################################
|
||||
# Returns 1 if the child process is running or there is no child process, 0 otherwise.
|
||||
########################################################################################
|
||||
sub checkProc ($) {
|
||||
my $self = shift;
|
||||
|
||||
# Should there be a child process?
|
||||
if (defined($self->{'_child_pid'}) && $self->{'_child_pid'} != 0) {
|
||||
|
||||
# Is the child process running?
|
||||
if (kill(0, $self->{'_child_pid'}) == 0) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
########################################################################################
|
||||
# Generate a 'going up' event.
|
||||
########################################################################################
|
||||
|
|
Loading…
Reference in New Issue