More bugfixes for AsyncTask.

This commit is contained in:
Gunnar Beutner 2012-07-15 17:29:59 +02:00
parent 18bbde330c
commit fd38babd98
5 changed files with 76 additions and 46 deletions

View File

@ -65,11 +65,13 @@ public:
*/
void Start(const CompletionCallback& completionCallback)
{
assert(Application::IsMainThread());
m_CompletionCallback = completionCallback;
CallWithExceptionGuard(boost::bind(&AsyncTask<TClass, TResult>::Run, this));
try {
Run();
} catch (const exception& ex) {
FinishException(boost::current_exception());
}
}
/**
@ -101,7 +103,7 @@ public:
*
* @param ex The exception.
*/
void Finish(const boost::exception_ptr& ex)
void FinishException(const boost::exception_ptr& ex)
{
m_Exception = ex;
FinishInternal();
@ -112,7 +114,7 @@ public:
*
* @param result The result.
*/
void Finish(const TResult& result)
void FinishResult(const TResult& result)
{
m_Result = result;
FinishInternal();
@ -127,7 +129,7 @@ public:
* @param function The function that should be invoked.
* @returns true if no exception occured, false otherwise.
*/
bool CallWithExceptionGuard(function<void ()> function)
/*bool CallWithExceptionGuard(function<void ()> function)
{
try {
function();
@ -138,7 +140,7 @@ public:
return false;
}
}
}*/
protected:
virtual void Run(void) = 0;
@ -150,12 +152,7 @@ private:
*/
void InvokeCompletionCallback(void)
{
m_Finished = true;
m_CompletionCallback(GetSelf());
/* Clear callback because the bound function might hold a
* reference to this task. */
m_CompletionCallback = CompletionCallback();
}
/**
@ -166,7 +163,12 @@ private:
{
assert(!m_Finished);
Event::Post(boost::bind(&AsyncTask<TClass, TResult>::InvokeCompletionCallback, this));
m_Finished = true;
m_CompletionCallback(GetSelf());
/* Clear callback because the bound function might hold a
* reference to this task. */
m_CompletionCallback = CompletionCallback();
}
CompletionCallback m_CompletionCallback; /**< The completion callback. */

View File

@ -97,7 +97,7 @@ void Process::WorkerThreadProc(void)
it++;
tasks.erase(prev);
task->Finish(task->m_Result);
Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
} else {
it++;
}
@ -112,10 +112,14 @@ void Process::WorkerThreadProc(void)
lock.unlock();
if (task->CallWithExceptionGuard(boost::bind(&Process::InitTask, task))) {
try {
task->InitTask();
int fd = task->GetFD();
if (fd >= 0)
tasks[fd] = task;
} catch (const exception&) {
Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
}
lock.lock();

View File

@ -21,6 +21,10 @@
using namespace icinga;
NagiosCheckTask::NagiosCheckTask(const ScriptTask::Ptr& task, const Process::Ptr& process)
: m_Task(task), m_Process(process)
{ }
void NagiosCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Variant>& arguments)
{
if (arguments.size() < 1)
@ -49,14 +53,22 @@ void NagiosCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Varia
time(&now);
result.SetScheduleStart(now);
Process::Ptr process = boost::make_shared<Process>(command, boost::bind(&NagiosCheckTask::ProcessFinishedHandler, task, _1, result));
process->Start();
Process::Ptr process = boost::make_shared<Process>(command);
NagiosCheckTask ct(task, process);
process->Start(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, ct, result));
}
void NagiosCheckTask::ProcessFinishedHandler(const ScriptTask::Ptr& task, const Process::Ptr& process, CheckResult result)
void NagiosCheckTask::ProcessFinishedHandler(NagiosCheckTask ct, CheckResult result)
{
ProcessResult pr;
pr = process->GetResult();
try {
pr = ct.m_Process->GetResult();
} catch (const exception&) {
ct.m_Task->FinishException(boost::current_exception());
return;
}
result.SetExecutionStart(pr.ExecutionStart);
result.SetExecutionEnd(pr.ExecutionEnd);
@ -88,7 +100,7 @@ void NagiosCheckTask::ProcessFinishedHandler(const ScriptTask::Ptr& task, const
time(&now);
result.SetScheduleEnd(now);
task->Finish(result.GetDictionary());
ct.m_Task->FinishResult(result.GetDictionary());
}
void NagiosCheckTask::ProcessCheckOutput(CheckResult& result, const string& output)

View File

@ -31,8 +31,13 @@ public:
static void Register(void);
private:
static void ProcessFinishedHandler(const ScriptTask::Ptr& task, const Process::Ptr& process, CheckResult result);
static void ProcessFinishedHandler(NagiosCheckTask ct, CheckResult result);
static void ProcessCheckOutput(CheckResult& result, const string& output);
NagiosCheckTask(const ScriptTask::Ptr& task, const Process::Ptr& process);
ScriptTask::Ptr m_Task;
Process::Ptr m_Process;
};
}

View File

@ -103,41 +103,48 @@ void CheckerComponent::CheckCompletedHandler(Service service, const ScriptTask::
{
service.RemoveTag("current_task");
/* if the service isn't in the set of pending services
* it was removed and we need to ignore this check result. */
if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
return;
try {
Variant vresult = task->GetResult();
Variant vresult = task->GetResult();
bool hasResult = false;
if (vresult.IsObjectType<Dictionary>()) {
CheckResult result = CheckResult(static_cast<Dictionary::Ptr>(vresult));
if (vresult.IsObjectType<Dictionary>()) {
CheckResult result = CheckResult(static_cast<Dictionary::Ptr>(vresult));
/* update service state */
service.ApplyCheckResult(result);
/* update service state */
service.ApplyCheckResult(result);
RequestMessage rm;
rm.SetMethod("checker::CheckResult");
RequestMessage rm;
rm.SetMethod("checker::CheckResult");
ServiceStatusMessage params;
params.SetService(service.GetName());
params.SetState(service.GetState());
params.SetStateType(service.GetStateType());
params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
params.SetNextCheck(service.GetNextCheck());
params.SetCheckResult(result);
ServiceStatusMessage params;
params.SetService(service.GetName());
params.SetState(service.GetState());
params.SetStateType(service.GetStateType());
params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
params.SetNextCheck(service.GetNextCheck());
params.SetCheckResult(result);
rm.SetParams(params);
rm.SetParams(params);
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
}
} catch (const exception& ex) {
stringstream msgbuf;
msgbuf << "Exception occured during check for service '"
<< service.GetName() << "': " << ex.what();
Logger::Write(LogWarning, "checker", msgbuf.str());
}
/* figure out when the next check is for this service */
service.UpdateNextCheck();
/* remove the service from the list of pending services */
m_PendingServices.erase(service.GetConfigObject());
m_Services.push(service);
/* remove the service from the list of pending services; if it's not in the
* list this was a manual (i.e. forced) check and we must not re-add the
* service to the services list because it's already there. */
if (m_PendingServices.find(service.GetConfigObject()) != m_PendingServices.end()) {
m_PendingServices.erase(service.GetConfigObject());
m_Services.push(service);
}
Logger::Write(LogDebug, "checker", "Check finished for service '" + service.GetName() + "'");
}