diff --git a/pandora_server/ChangeLog b/pandora_server/ChangeLog index a3d6f073db..67e59918e3 100644 --- a/pandora_server/ChangeLog +++ b/pandora_server/ChangeLog @@ -1,3 +1,9 @@ +2008-04-22 Ramon Novoa + + * bin/pandora_plugin, bin/pandora_prediction, + bin/pandora_network: Consumer threads now check for data + availability after lock() to avoid race conditions. + 2008-04-22 Ramon Novoa * bin/pandora_server: The data server now uses the producer/consumer diff --git a/pandora_server/bin/pandora_network b/pandora_server/bin/pandora_network index c6c49ebc7d..51cee7bc1f 100755 --- a/pandora_server/bin/pandora_network +++ b/pandora_server/bin/pandora_network @@ -144,7 +144,7 @@ sub pandora_network_consumer ($$) { my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $counter =0; - while (1) { + LOOP: while (1) { if ($counter > 10) { sleep (1); $counter = 0; @@ -152,34 +152,35 @@ sub pandora_network_consumer ($$) { # Take the first element on the shared queue # Insert this element on the current task hash - if (scalar(@pending_task) > 0){ - { - lock $queue_lock; - $data_id_agent_module = shift(@pending_task); - delete($pending_task_hash{$data_id_agent_module}); - $current_task_hash{$data_id_agent_module}=1; - } - - # Executing network task with unmanaged error trapping - eval { - # Call network execution process - exec_network_module ( $pa_config, $data_id_agent_module, $dbh); - }; - if ($@){ - logger ($pa_config, "[ERROR] Network Task for module $data_id_agent_module causes a system exception", 0); - logger ($pa_config, "ERROR Code: $@", 1); + { + lock $queue_lock; + if (scalar(@pending_task) == 0){ + $counter++; + next LOOP; } - # Remove from queue. If catch an error, probably data is - # not been processed, but has been freed from task queue - { - lock $queue_lock; - delete($current_task_hash{$data_id_agent_module}); - } - $counter = 0; - } else { - $counter ++; + $data_id_agent_module = shift(@pending_task); + delete($pending_task_hash{$data_id_agent_module}); + $current_task_hash{$data_id_agent_module}=1; } + + # Executing network task with unmanaged error trapping + eval { + # Call network execution process + exec_network_module ( $pa_config, $data_id_agent_module, $dbh); + }; + if ($@){ + logger ($pa_config, "[ERROR] Network Task for module $data_id_agent_module causes a system exception", 0); + logger ($pa_config, "ERROR Code: $@", 1); + } + + # Remove from queue. If catch an error, probably data is + # not been processed, but has been freed from task queue + { + lock $queue_lock; + delete($current_task_hash{$data_id_agent_module}); + } + $counter = 0; } } diff --git a/pandora_server/bin/pandora_plugin b/pandora_server/bin/pandora_plugin index b8bcc51064..507b827372 100755 --- a/pandora_server/bin/pandora_plugin +++ b/pandora_server/bin/pandora_plugin @@ -132,7 +132,7 @@ sub pandora_plugin_consumer ($$) { my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $counter =0; - while (1) { + LOOP: while (1) { if ($counter > 10) { sleep (1); $counter = 0; @@ -140,38 +140,39 @@ sub pandora_plugin_consumer ($$) { # Take the first element on the shared queue # Insert this element on the current task hash - if (scalar(@pending_task) > 0){ - { - lock $queue_lock; - $data_id_agent_module = shift(@pending_task); -#print "[CLIENT] Pop out of queue module (pending queue) $data_id_agent_module \n"; - delete($pending_task_hash{$data_id_agent_module}); - $current_task_hash{$data_id_agent_module}=1; - } - - # Executing network task with unmanaged error trapping - eval { - # Call network execution process - # exec_network_module ( $pa_config, $data_id_agent_module, $dbh); -print "[CLIENT] Executing module $data_id_agent_module \n"; - exec_plugin_module ($pa_config, $data_id_agent_module, $dbh); - }; - if ($@){ - logger ($pa_config, "[ERROR] Plugin Task for module $data_id_agent_module causes a system exception", 0); - logger ($pa_config, "ERROR Code: $@", 1); + { + lock $queue_lock; + if (scalar(@pending_task) == 0){ + $counter++; + next LOOP; } - # Remove from queue. If catch an error, probably data is - # not been processed, but has been freed from task queue - { - lock $queue_lock; -#print "[CLIENT] Removing from queue (current task) module $data_id_agent_module \n"; - delete($current_task_hash{$data_id_agent_module}); - } - $counter = 0; - } else { - $counter ++; + $data_id_agent_module = shift(@pending_task); +#print "[CLIENT] Pop out of queue module (pending queue) $data_id_agent_module \n"; + delete($pending_task_hash{$data_id_agent_module}); + $current_task_hash{$data_id_agent_module}=1; } + + # Executing network task with unmanaged error trapping + eval { + # Call network execution process + # exec_network_module ( $pa_config, $data_id_agent_module, $dbh); +print "[CLIENT] Executing module $data_id_agent_module \n"; + exec_plugin_module ($pa_config, $data_id_agent_module, $dbh); + }; + if ($@){ + logger ($pa_config, "[ERROR] Plugin Task for module $data_id_agent_module causes a system exception", 0); + logger ($pa_config, "ERROR Code: $@", 1); + } + + # Remove from queue. If catch an error, probably data is + # not been processed, but has been freed from task queue + { + lock $queue_lock; +#print "[CLIENT] Removing from queue (current task) module $data_id_agent_module \n"; + delete($current_task_hash{$data_id_agent_module}); + } + $counter = 0; } } diff --git a/pandora_server/bin/pandora_prediction b/pandora_server/bin/pandora_prediction index a2a15a995a..e5a183fdcd 100755 --- a/pandora_server/bin/pandora_prediction +++ b/pandora_server/bin/pandora_prediction @@ -128,7 +128,7 @@ sub pandora_prediction_consumer ($$) { my $dbh = DBI->connect("DBI:mysql:$pa_config->{'dbname'}:$pa_config->{'dbhost'}:3306", $pa_config->{'dbuser'}, $pa_config->{'dbpass'}, { RaiseError => 1, AutoCommit => 1 }); my $counter =0; - while (1) { + LOOP: while (1) { if ($counter > 10) { sleep (1); $counter = 0; @@ -136,35 +136,36 @@ sub pandora_prediction_consumer ($$) { # Take the first element on the shared queue # Insert this element on the current task hash - if (scalar(@pending_task) > 0){ - { - lock $queue_lock; - $data_id_agent_module = shift(@pending_task); - delete($pending_task_hash{$data_id_agent_module}); - $current_task_hash{$data_id_agent_module}=1; - } - - # Executing network task with unmanaged error trapping - eval { - # Call network execution process - # exec_network_module ( $pa_config, $data_id_agent_module, $dbh); - exec_prediction_module ($pa_config, $data_id_agent_module, $dbh); - }; - if ($@){ - logger ($pa_config, "[ERROR] Prediction Task for module $data_id_agent_module causes a system exception", 0); - logger ($pa_config, "ERROR Code: $@", 1); + { + lock $queue_lock; + if (scalar(@pending_task) == 0){ + $counter++; + next LOOP; } - # Remove from queue. If catch an error, probably data is - # not been processed, but has been freed from task queue - { - lock $queue_lock; - delete($current_task_hash{$data_id_agent_module}); - } - $counter = 0; - } else { - $counter ++; + $data_id_agent_module = shift(@pending_task); + delete($pending_task_hash{$data_id_agent_module}); + $current_task_hash{$data_id_agent_module}=1; } + + # Executing network task with unmanaged error trapping + eval { + # Call network execution process + # exec_network_module ( $pa_config, $data_id_agent_module, $dbh); + exec_prediction_module ($pa_config, $data_id_agent_module, $dbh); + }; + if ($@){ + logger ($pa_config, "[ERROR] Prediction Task for module $data_id_agent_module causes a system exception", 0); + logger ($pa_config, "ERROR Code: $@", 1); + } + + # Remove from queue. If catch an error, probably data is + # not been processed, but has been freed from task queue + { + lock $queue_lock; + delete($current_task_hash{$data_id_agent_module}); + } + $counter = 0; } }