2008-04-22 Ramon Novoa <rnovoa@artica.es>

* bin/pandora_plugin, bin/pandora_prediction,
          bin/pandora_network: Consumer threads now check for data
          availability after lock() to avoid race conditions.




git-svn-id: https://svn.code.sf.net/p/pandora/code/trunk@814 c3f86ba8-e40f-0410-aaad-9ba5e7f4b01f
This commit is contained in:
Ramon Novoa 2008-04-22 14:35:19 +00:00
parent 3dccbc5610
commit 65affa8984
4 changed files with 92 additions and 83 deletions

View File

@ -1,3 +1,9 @@
2008-04-22 Ramon Novoa <rnovoa@artica.es>
* bin/pandora_plugin, bin/pandora_prediction,
bin/pandora_network: Consumer threads now check for data
availability after lock() to avoid race conditions.
2008-04-22 Ramon Novoa <rnovoa@artica.es> 2008-04-22 Ramon Novoa <rnovoa@artica.es>
* bin/pandora_server: The data server now uses the producer/consumer * bin/pandora_server: The data server now uses the producer/consumer

View File

@ -144,7 +144,7 @@ sub pandora_network_consumer ($$) {
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 });
my $counter =0; my $counter =0;
while (1) { LOOP: while (1) {
if ($counter > 10) { if ($counter > 10) {
sleep (1); sleep (1);
$counter = 0; $counter = 0;
@ -152,34 +152,35 @@ sub pandora_network_consumer ($$) {
# Take the first element on the shared queue # Take the first element on the shared queue
# Insert this element on the current task hash # Insert this element on the current task hash
if (scalar(@pending_task) > 0){ {
{ lock $queue_lock;
lock $queue_lock; if (scalar(@pending_task) == 0){
$data_id_agent_module = shift(@pending_task); $counter++;
delete($pending_task_hash{$data_id_agent_module}); next LOOP;
$current_task_hash{$data_id_agent_module}=1;
}
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Network Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
} }
# Remove from queue. If catch an error, probably data is $data_id_agent_module = shift(@pending_task);
# not been processed, but has been freed from task queue delete($pending_task_hash{$data_id_agent_module});
{ $current_task_hash{$data_id_agent_module}=1;
lock $queue_lock;
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} else {
$counter ++;
} }
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Network Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
}
# Remove from queue. If catch an error, probably data is
# not been processed, but has been freed from task queue
{
lock $queue_lock;
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} }
} }

View File

@ -132,7 +132,7 @@ sub pandora_plugin_consumer ($$) {
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 });
my $counter =0; my $counter =0;
while (1) { LOOP: while (1) {
if ($counter > 10) { if ($counter > 10) {
sleep (1); sleep (1);
$counter = 0; $counter = 0;
@ -140,38 +140,39 @@ sub pandora_plugin_consumer ($$) {
# Take the first element on the shared queue # Take the first element on the shared queue
# Insert this element on the current task hash # Insert this element on the current task hash
if (scalar(@pending_task) > 0){ {
{ lock $queue_lock;
lock $queue_lock; if (scalar(@pending_task) == 0){
$data_id_agent_module = shift(@pending_task); $counter++;
#print "[CLIENT] Pop out of queue module (pending queue) $data_id_agent_module \n"; next LOOP;
delete($pending_task_hash{$data_id_agent_module});
$current_task_hash{$data_id_agent_module}=1;
}
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
# exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
print "[CLIENT] Executing module $data_id_agent_module \n";
exec_plugin_module ($pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Plugin Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
} }
# Remove from queue. If catch an error, probably data is $data_id_agent_module = shift(@pending_task);
# not been processed, but has been freed from task queue #print "[CLIENT] Pop out of queue module (pending queue) $data_id_agent_module \n";
{ delete($pending_task_hash{$data_id_agent_module});
lock $queue_lock; $current_task_hash{$data_id_agent_module}=1;
#print "[CLIENT] Removing from queue (current task) module $data_id_agent_module \n";
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} else {
$counter ++;
} }
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
# exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
print "[CLIENT] Executing module $data_id_agent_module \n";
exec_plugin_module ($pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Plugin Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
}
# Remove from queue. If catch an error, probably data is
# not been processed, but has been freed from task queue
{
lock $queue_lock;
#print "[CLIENT] Removing from queue (current task) module $data_id_agent_module \n";
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} }
} }

View File

@ -128,7 +128,7 @@ sub pandora_prediction_consumer ($$) {
my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 });
my $counter =0; my $counter =0;
while (1) { LOOP: while (1) {
if ($counter > 10) { if ($counter > 10) {
sleep (1); sleep (1);
$counter = 0; $counter = 0;
@ -136,35 +136,36 @@ sub pandora_prediction_consumer ($$) {
# Take the first element on the shared queue # Take the first element on the shared queue
# Insert this element on the current task hash # Insert this element on the current task hash
if (scalar(@pending_task) > 0){ {
{ lock $queue_lock;
lock $queue_lock; if (scalar(@pending_task) == 0){
$data_id_agent_module = shift(@pending_task); $counter++;
delete($pending_task_hash{$data_id_agent_module}); next LOOP;
$current_task_hash{$data_id_agent_module}=1;
}
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
# exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
exec_prediction_module ($pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Prediction Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
} }
# Remove from queue. If catch an error, probably data is $data_id_agent_module = shift(@pending_task);
# not been processed, but has been freed from task queue delete($pending_task_hash{$data_id_agent_module});
{ $current_task_hash{$data_id_agent_module}=1;
lock $queue_lock;
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} else {
$counter ++;
} }
# Executing network task with unmanaged error trapping
eval {
# Call network execution process
# exec_network_module ( $pa_config, $data_id_agent_module, $dbh);
exec_prediction_module ($pa_config, $data_id_agent_module, $dbh);
};
if ($@){
logger ($pa_config, "[ERROR] Prediction Task for module $data_id_agent_module causes a system exception", 0);
logger ($pa_config, "ERROR Code: $@", 1);
}
# Remove from queue. If catch an error, probably data is
# not been processed, but has been freed from task queue
{
lock $queue_lock;
delete($current_task_hash{$data_id_agent_module});
}
$counter = 0;
} }
} }