mirror of
https://github.com/pandorafms/pandorafms.git
synced 2025-07-31 01:35:36 +02:00
Add support for parallel queries.
This commit is contained in:
parent
dce2b3f915
commit
2a8782b73e
@ -19,6 +19,9 @@ package PandoraFMS::DB;
|
|||||||
|
|
||||||
use strict;
|
use strict;
|
||||||
use warnings;
|
use warnings;
|
||||||
|
|
||||||
|
use threads;
|
||||||
|
|
||||||
use DBI;
|
use DBI;
|
||||||
use Carp qw/croak/;
|
use Carp qw/croak/;
|
||||||
|
|
||||||
@ -72,8 +75,11 @@ our @EXPORT = qw(
|
|||||||
get_alert_template_name
|
get_alert_template_name
|
||||||
get_command_id
|
get_command_id
|
||||||
get_console_api_url
|
get_console_api_url
|
||||||
|
get_db_nodes
|
||||||
get_db_rows
|
get_db_rows
|
||||||
get_db_rows_limit
|
get_db_rows_limit
|
||||||
|
get_db_rows_node
|
||||||
|
get_db_rows_parallel
|
||||||
get_db_single_row
|
get_db_single_row
|
||||||
get_db_value
|
get_db_value
|
||||||
get_db_value_limit
|
get_db_value_limit
|
||||||
@ -947,6 +953,37 @@ sub get_db_single_row ($$;@) {
|
|||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
##########################################################################
|
||||||
|
## Get DB information for all known Pandora FMS nodes.
|
||||||
|
##########################################################################
|
||||||
|
sub get_db_nodes ($$) {
|
||||||
|
my ($dbh, $pa_config) = @_;
|
||||||
|
my $dbh_nodes = [];
|
||||||
|
|
||||||
|
# Insert the current node first.
|
||||||
|
push(@{$dbh_nodes},
|
||||||
|
{'dbengine' => $pa_config->{'dbengine'},
|
||||||
|
'dbname' => $pa_config->{'dbname'},
|
||||||
|
'dbhost' => $pa_config->{'dbhost'},
|
||||||
|
'dbport' => $pa_config->{'dbport'},
|
||||||
|
'dbuser' => $pa_config->{'dbuser'},
|
||||||
|
'dbpass' => $pa_config->{'dbpass'}});
|
||||||
|
|
||||||
|
# Look for additional nodes.
|
||||||
|
my @nodes = get_db_rows($dbh, 'SELECT * FROM tmetaconsole_setup');
|
||||||
|
foreach my $node (@nodes) {
|
||||||
|
push(@{$dbh_nodes},
|
||||||
|
{'dbengine' => $pa_config->{'dbengine'},
|
||||||
|
'dbname' => $node->{'dbname'},
|
||||||
|
'dbhost' => $node->{'dbhost'},
|
||||||
|
'dbport' => $node->{'dbport'},
|
||||||
|
'dbuser' => $node->{'dbuser'},
|
||||||
|
'dbpass' => $node->{'dbpass'}});
|
||||||
|
}
|
||||||
|
|
||||||
|
return $dbh_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
##########################################################################
|
##########################################################################
|
||||||
## Get all rows returned by an SQL query as a hash reference array.
|
## Get all rows returned by an SQL query as a hash reference array.
|
||||||
##########################################################################
|
##########################################################################
|
||||||
@ -961,18 +998,67 @@ sub get_db_rows ($$;@) {
|
|||||||
|
|
||||||
# Save returned rows
|
# Save returned rows
|
||||||
while (my $row = $sth->fetchrow_hashref()) {
|
while (my $row = $sth->fetchrow_hashref()) {
|
||||||
if ($RDBMS eq 'oracle') {
|
push (@rows, $row);
|
||||||
push (@rows, {map { lc ($_) => $row->{$_} } keys (%{$row})});
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
push (@rows, $row);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$sth->finish();
|
$sth->finish();
|
||||||
return @rows;
|
return @rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
##########################################################################
|
||||||
|
## Connect to the given node and run get_db_rows.
|
||||||
|
##########################################################################
|
||||||
|
sub get_db_rows_node ($$;@) {
|
||||||
|
my ($node, $query, @values) = @_;
|
||||||
|
my $dbh;
|
||||||
|
my @rows;
|
||||||
|
|
||||||
|
eval {
|
||||||
|
$dbh = db_connect($node->{'dbengine'},
|
||||||
|
$node->{'dbname'},
|
||||||
|
$node->{'dbhost'},
|
||||||
|
$node->{'dbport'},
|
||||||
|
$node->{'dbuser'},
|
||||||
|
$node->{'dbpass'});
|
||||||
|
@rows = get_db_rows($dbh, $query, @values);
|
||||||
|
};
|
||||||
|
|
||||||
|
db_disconnect($dbh) if defined($dbh);
|
||||||
|
|
||||||
|
return \@rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
##########################################################################
|
||||||
|
## Run get_db_rows on all known Pandora FMS nodes in parallel.
|
||||||
|
##########################################################################
|
||||||
|
sub get_db_rows_parallel ($$;@) {
|
||||||
|
my ($nodes, $query, @values) = @_;
|
||||||
|
|
||||||
|
# Launch the queries.
|
||||||
|
my @threads;
|
||||||
|
{
|
||||||
|
# Calling DESTROY would make the server restart.
|
||||||
|
no warnings 'redefine';
|
||||||
|
local *PandoraFMS::ProducerConsumerServer::DESTROY = sub {};
|
||||||
|
local *PandoraFMS::BlockProducerConsumerServer::DESTROY = sub {};
|
||||||
|
|
||||||
|
# Query the nodes.
|
||||||
|
foreach my $node (@{$nodes}) {
|
||||||
|
my $thr = threads->create(\&get_db_rows_node, $node, $query, @values);
|
||||||
|
push(@threads, $thr) if defined($thr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Retrieve the results.
|
||||||
|
my @combined_res;
|
||||||
|
foreach my $thr (@threads) {
|
||||||
|
my $res = $thr->join();
|
||||||
|
push(@combined_res, @{$res}) if defined($res);
|
||||||
|
}
|
||||||
|
|
||||||
|
return @combined_res;
|
||||||
|
}
|
||||||
|
|
||||||
########################################################################
|
########################################################################
|
||||||
## Get all rows (with a limit clause) returned by an SQL query
|
## Get all rows (with a limit clause) returned by an SQL query
|
||||||
## as a hash reference array.
|
## as a hash reference array.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user