Implemented exception support for AsyncTasks.

This commit is contained in:
Gunnar Beutner 2012-07-15 10:58:03 +02:00
parent e6b99c62a7
commit 3e472012d1
17 changed files with 182 additions and 138 deletions

View File

@ -1,53 +1,81 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h" #include "i2-base.h"
using namespace icinga; using namespace icinga;
AsyncTask::AsyncTask(const AsyncTask::CompletionCallback& completionCallback) /**
: m_Finished(false), m_CompletionCallback(completionCallback) * Constructor for the AsyncTaskBase class.
*/
AsyncTaskBase::AsyncTaskBase(void)
: m_Finished(false), m_ResultRetrieved(false)
{ } { }
AsyncTask::~AsyncTask(void) /**
* Destructor for the AsyncTaskBase class.
*/
AsyncTaskBase::~AsyncTaskBase(void)
{ {
if (!m_Finished) { if (!m_Finished) {
Logger::Write(LogCritical, "base", "Contract violation: " Logger::Write(LogCritical, "base", "Contract violation: "
"AsyncTask was destroyed before its completion callback was invoked."); "AsyncTask was destroyed before its completion callback was invoked.");
} else if (!m_ResultRetrieved) {
Logger::Write(LogCritical, "base", "Contract violation: "
"AsyncTask was destroyed before its result was retrieved.");
} }
} }
void AsyncTask::Start(void) /**
* Starts the async task. The caller must hold a reference to the AsyncTask
* object until the completion callback is invoked.
*/
void AsyncTaskBase::Start(void)
{ {
assert(Application::IsMainThread()); assert(Application::IsMainThread());
Run(); CallWithExceptionGuard(boost::bind(&AsyncTaskBase::Run, this));
} }
void AsyncTask::Finish(void) /**
* Finishes the task using an exception.
*
* @param ex The exception.
*/
void AsyncTaskBase::Finish(const boost::exception_ptr& ex)
{ {
Event::Post(boost::bind(&AsyncTask::ForwardCallback, static_cast<AsyncTask::Ptr>(GetSelf()))); m_Exception = ex;
FinishInternal();
} }
void AsyncTask::ForwardCallback(void) /**
* Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed.
*/
void AsyncTaskBase::FinishInternal(void)
{ {
m_CompletionCallback(GetSelf()); assert(!m_Finished);
m_CompletionCallback = CompletionCallback();
m_Finished = true; Event::Post(boost::bind(&AsyncTaskBase::InvokeCompletionCallback,
static_cast<AsyncTaskBase::Ptr>(GetSelf())));
}
/**
* Invokes the provided callback function and catches any exceptions it throws.
* Exceptions are stored into the task so that they can be re-thrown when the
* task owner calls GetResult().
*
* @param task The task where exceptions should be saved.
* @param function The function that should be invoked.
* @returns true if no exception occured, false otherwise.
*/
bool AsyncTaskBase::CallWithExceptionGuard(function<void ()> function)
{
try {
function();
return true;
} catch (const exception&) {
Finish(boost::current_exception());
return false;
}
} }

View File

@ -23,29 +23,98 @@
namespace icinga namespace icinga
{ {
class I2_BASE_API AsyncTask : public Object class I2_BASE_API AsyncTaskBase : public Object
{ {
public: public:
typedef shared_ptr<AsyncTask> Ptr; typedef shared_ptr<AsyncTaskBase> Ptr;
typedef weak_ptr<AsyncTask> WeakPtr; typedef weak_ptr<AsyncTaskBase> WeakPtr;
typedef function<void (const AsyncTask::Ptr&)> CompletionCallback; AsyncTaskBase(void);
~AsyncTaskBase(void);
AsyncTask(const CompletionCallback& completionCallback);
~AsyncTask(void);
void Start(void); void Start(void);
void Finish(const boost::exception_ptr& ex);
void Finish(void); bool CallWithExceptionGuard(function<void ()> function);
protected: protected:
virtual void Run(void) = 0; virtual void Run(void) = 0;
private: virtual void InvokeCompletionCallback(void) = 0;
void ForwardCallback(void);
bool m_Finished; void FinishInternal(void);
CompletionCallback m_CompletionCallback;
bool m_Finished; /**< Whether the task is finished. */
bool m_ResultRetrieved; /**< Whether the result was retrieved. */
boost::exception_ptr m_Exception;
};
/**
* An asynchronous task.
*
* @ingroup base
*/
template<typename TClass, typename TResult>
class AsyncTask : public AsyncTaskBase
{
public:
typedef shared_ptr<AsyncTask<TClass, TResult> > Ptr;
typedef weak_ptr<AsyncTask<TClass, TResult> > WeakPtr;
typedef function<void (const typename shared_ptr<TClass>&)> CompletionCallback;
/**
* Constructor for the AsyncTask class.
*
* @param completionCallback Function that is called when the task is completed.
*/
AsyncTask(const CompletionCallback& completionCallback)
: m_CompletionCallback(completionCallback)
{ }
/**
* Retrieves the result of the task. Throws an exception if one is stored in
* the AsyncTask object.
*
* @returns The task's result.
*/
TResult GetResult(void)
{
if (!m_Finished)
throw runtime_error("GetResult called on an unfinished AsyncTask");
if (m_ResultRetrieved)
throw runtime_error("GetResult called on an AsyncTask whose result was already retrieved.");
if (m_Exception)
boost::rethrow_exception(m_Exception);
return m_Result;
}
void Finish(const TResult& result)
{
m_Result = result;
FinishInternal();
}
private:
/**
* Used by the Finish method to proxy the completion callback into the main
* thread. Invokes the completion callback and marks the task as finished.
*/
virtual void InvokeCompletionCallback(void)
{
m_Finished = true;
m_CompletionCallback(GetSelf());
/* Clear callback because the bound function might hold a
* reference to this task. */
m_CompletionCallback = CompletionCallback();
}
CompletionCallback m_CompletionCallback; /**< The completion callback. */
TResult m_Result; /**< The task's result. */
}; };
} }

View File

@ -79,15 +79,15 @@
<ClCompile Include="process.cpp"> <ClCompile Include="process.cpp">
<Filter>Quelldateien</Filter> <Filter>Quelldateien</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="asynctask.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="scriptfunction.cpp"> <ClCompile Include="scriptfunction.cpp">
<Filter>Quelldateien</Filter> <Filter>Quelldateien</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="scripttask.cpp"> <ClCompile Include="scripttask.cpp">
<Filter>Quelldateien</Filter> <Filter>Quelldateien</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="asynctask.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="application.h"> <ClInclude Include="application.h">

View File

@ -198,7 +198,7 @@ void ConfigObject::RemoveTag(const string& key)
} }
ScriptTask::Ptr ConfigObject::InvokeHook(const string& hook, ScriptTask::Ptr ConfigObject::InvokeHook(const string& hook,
const vector<Variant>& arguments, AsyncTask::CompletionCallback callback) const vector<Variant>& arguments, ScriptTask::CompletionCallback callback)
{ {
Dictionary::Ptr hooks; Dictionary::Ptr hooks;
string funcName; string funcName;

View File

@ -66,7 +66,7 @@ public:
void RemoveTag(const string& key); void RemoveTag(const string& key);
ScriptTask::Ptr InvokeHook(const string& hook, ScriptTask::Ptr InvokeHook(const string& hook,
const vector<Variant>& arguments, AsyncTask::CompletionCallback callback); const vector<Variant>& arguments, ScriptTask::CompletionCallback callback);
string GetType(void) const; string GetType(void) const;
string GetName(void) const; string GetName(void) const;

View File

@ -97,7 +97,7 @@ void Process::WorkerThreadProc(void)
it++; it++;
tasks.erase(prev); tasks.erase(prev);
task->Finish(); task->Finish(task->m_Result);
} else { } else {
it++; it++;
} }
@ -112,9 +112,7 @@ void Process::WorkerThreadProc(void)
lock.unlock(); lock.unlock();
if (!task->InitTask()) { if (task->CallWithExceptionGuard(boost::bind(&Process::InitTask, task))) {
task->Finish();
} else {
int fd = task->GetFD(); int fd = task->GetFD();
if (fd >= 0) if (fd >= 0)
tasks[fd] = task; tasks[fd] = task;
@ -125,9 +123,9 @@ void Process::WorkerThreadProc(void)
} }
} }
bool Process::InitTask(void) void Process::InitTask(void)
{ {
time(&m_ExecutionStart); time(&m_Result.ExecutionStart);
#ifdef _MSC_VER #ifdef _MSC_VER
m_FP = _popen(m_Command.c_str(), "r"); m_FP = _popen(m_Command.c_str(), "r");
@ -138,7 +136,7 @@ bool Process::InitTask(void)
m_FP = popen_noshell_compat(m_Command.c_str(), "r", m_FP = popen_noshell_compat(m_Command.c_str(), "r",
(popen_noshell_pass_to_pclose *)m_PCloseArg); (popen_noshell_pass_to_pclose *)m_PCloseArg);
if (m_FP == NULL) // TODO: add check for valgrind if (m_FP == NULL)
m_UsePopen = true; m_UsePopen = true;
} }
@ -146,13 +144,8 @@ bool Process::InitTask(void)
m_FP = popen(m_Command.c_str(), "r"); m_FP = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */ #endif /* _MSC_VER */
if (m_FP == NULL) { if (m_FP == NULL)
m_ExitStatus = 128; throw runtime_error("Could not create process.");
m_ExecutionEnd = m_ExecutionStart;
return false;
}
return true;
} }
bool Process::RunTask(void) bool Process::RunTask(void)
@ -180,7 +173,7 @@ bool Process::RunTask(void)
} }
#endif /* _MSC_VER */ #endif /* _MSC_VER */
time(&m_ExecutionEnd); time(&m_Result.ExecutionEnd);
#ifndef _MSC_VER #ifndef _MSC_VER
if (WIFEXITED(status)) { if (WIFEXITED(status)) {
@ -204,8 +197,8 @@ bool Process::RunTask(void)
} }
#endif /* _MSC_VER */ #endif /* _MSC_VER */
m_ExitStatus = exitcode; m_Result.ExitStatus = exitcode;
m_Output = output; m_Result.Output = output;
return false; return false;
} }
@ -215,23 +208,3 @@ int Process::GetFD(void) const
return fileno(m_FP); return fileno(m_FP);
} }
long Process::GetExitStatus(void) const
{
return m_ExitStatus;
}
string Process::GetOutput(void) const
{
return m_Output;
}
time_t Process::GetExecutionStart(void) const
{
return m_ExecutionStart;
}
time_t Process::GetExecutionEnd(void) const
{
return m_ExecutionEnd;
}

View File

@ -23,7 +23,15 @@
namespace icinga namespace icinga
{ {
class I2_BASE_API Process : public AsyncTask struct ProcessResult
{
time_t ExecutionStart;
time_t ExecutionEnd;
long ExitStatus;
string Output;
};
class I2_BASE_API Process : public AsyncTask<Process, ProcessResult>
{ {
public: public:
typedef shared_ptr<Process> Ptr; typedef shared_ptr<Process> Ptr;
@ -33,12 +41,6 @@ public:
Process(const string& command, const CompletionCallback& completionCallback); Process(const string& command, const CompletionCallback& completionCallback);
time_t GetExecutionStart(void) const;
time_t GetExecutionEnd(void) const;
long GetExitStatus(void) const;
string GetOutput(void) const;
private: private:
static bool m_ThreadCreated; static bool m_ThreadCreated;
@ -51,10 +53,7 @@ private:
void *m_PCloseArg; void *m_PCloseArg;
#endif /* _MSC_VER */ #endif /* _MSC_VER */
time_t m_ExecutionStart; ProcessResult m_Result;
time_t m_ExecutionEnd;
long m_ExitStatus;
string m_Output;
virtual void Run(void); virtual void Run(void);
@ -64,7 +63,7 @@ private:
static void WorkerThreadProc(void); static void WorkerThreadProc(void);
bool InitTask(void); void InitTask(void);
bool RunTask(void); bool RunTask(void);
int GetFD(void) const; int GetFD(void) const;

View File

@ -10,13 +10,3 @@ void ScriptTask::Run(void)
{ {
m_Function->Invoke(GetSelf(), m_Arguments); m_Function->Invoke(GetSelf(), m_Arguments);
} }
void ScriptTask::SetResult(const Variant& result)
{
m_Result = result;
}
Variant ScriptTask::GetResult(void)
{
return m_Result;
}

View File

@ -23,7 +23,7 @@
namespace icinga namespace icinga
{ {
class I2_BASE_API ScriptTask : public AsyncTask class I2_BASE_API ScriptTask : public AsyncTask<ScriptTask, Variant>
{ {
public: public:
typedef shared_ptr<ScriptTask> Ptr; typedef shared_ptr<ScriptTask> Ptr;
@ -31,16 +31,12 @@ public:
ScriptTask(const ScriptFunction::Ptr& function, const vector<Variant>& arguments, CompletionCallback callback); ScriptTask(const ScriptFunction::Ptr& function, const vector<Variant>& arguments, CompletionCallback callback);
void SetResult(const Variant& result);
Variant GetResult(void);
protected: protected:
virtual void Run(void); virtual void Run(void);
private: private:
ScriptFunction::Ptr m_Function; ScriptFunction::Ptr m_Function;
vector<Variant> m_Arguments; vector<Variant> m_Arguments;
Variant m_Result;
}; };
} }

View File

@ -47,7 +47,7 @@ void ConfigObjectAdapter::RemoveTag(const string& key)
} }
ScriptTask::Ptr ConfigObjectAdapter::InvokeHook(const string& hook, ScriptTask::Ptr ConfigObjectAdapter::InvokeHook(const string& hook,
const vector<Variant>& arguments, AsyncTask::CompletionCallback callback) const vector<Variant>& arguments, ScriptTask::CompletionCallback callback)
{ {
return m_ConfigObject->InvokeHook(hook, arguments, callback); return m_ConfigObject->InvokeHook(hook, arguments, callback);
} }

View File

@ -58,7 +58,7 @@ public:
void RemoveTag(const string& key); void RemoveTag(const string& key);
ScriptTask::Ptr InvokeHook(const string& hook, ScriptTask::Ptr InvokeHook(const string& hook,
const vector<Variant>& arguments, AsyncTask::CompletionCallback callback); const vector<Variant>& arguments, ScriptTask::CompletionCallback callback);
private: private:
ConfigObject::Ptr m_ConfigObject; ConfigObject::Ptr m_ConfigObject;

View File

@ -53,22 +53,21 @@ void NagiosCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Varia
process->Start(); process->Start();
} }
void NagiosCheckTask::ProcessFinishedHandler(const ScriptTask::Ptr& task, const AsyncTask::Ptr& aprocess, CheckResult result) void NagiosCheckTask::ProcessFinishedHandler(const ScriptTask::Ptr& task, const Process::Ptr& process, CheckResult result)
{ {
Process::Ptr process = static_pointer_cast<Process>(aprocess); ProcessResult pr;
pr = process->GetResult();
result.SetExecutionStart(process->GetExecutionStart()); result.SetExecutionStart(pr.ExecutionStart);
result.SetExecutionEnd(process->GetExecutionEnd()); result.SetExecutionEnd(pr.ExecutionEnd);
string output = process->GetOutput();
long exitcode = process->GetExitStatus();
string output = pr.Output;
boost::algorithm::trim(output); boost::algorithm::trim(output);
ProcessCheckOutput(result, output); ProcessCheckOutput(result, output);
ServiceState state; ServiceState state;
switch (exitcode) { switch (pr.ExitStatus) {
case 0: case 0:
state = StateOK; state = StateOK;
break; break;
@ -89,8 +88,7 @@ void NagiosCheckTask::ProcessFinishedHandler(const ScriptTask::Ptr& task, const
time(&now); time(&now);
result.SetScheduleEnd(now); result.SetScheduleEnd(now);
task->SetResult(result.GetDictionary()); task->Finish(result.GetDictionary());
task->Finish();
} }
void NagiosCheckTask::ProcessCheckOutput(CheckResult& result, const string& output) void NagiosCheckTask::ProcessCheckOutput(CheckResult& result, const string& output)
@ -129,5 +127,5 @@ void NagiosCheckTask::ProcessCheckOutput(CheckResult& result, const string& outp
void NagiosCheckTask::Register(void) void NagiosCheckTask::Register(void)
{ {
ScriptFunction::Ptr func = boost::make_shared<ScriptFunction>(&NagiosCheckTask::ScriptFunc); ScriptFunction::Ptr func = boost::make_shared<ScriptFunction>(&NagiosCheckTask::ScriptFunc);
ScriptFunction::Register("builtin::NagiosCheck", func); ScriptFunction::Register("native::NagiosCheck", func);
} }

View File

@ -31,7 +31,7 @@ public:
static void Register(void); static void Register(void);
private: private:
static void ProcessFinishedHandler(const ScriptTask::Ptr& task, const AsyncTask::Ptr& aprocess, CheckResult result); static void ProcessFinishedHandler(const ScriptTask::Ptr& task, const Process::Ptr& process, CheckResult result);
static void ProcessCheckOutput(CheckResult& result, const string& output); static void ProcessCheckOutput(CheckResult& result, const string& output);
}; };

View File

@ -99,10 +99,8 @@ void CheckerComponent::CheckTimerHandler(void)
Logger::Write(LogInformation, "checker", msgbuf.str()); Logger::Write(LogInformation, "checker", msgbuf.str());
} }
void CheckerComponent::CheckCompletedHandler(Service service, const AsyncTask::Ptr& atask) void CheckerComponent::CheckCompletedHandler(Service service, const ScriptTask::Ptr& task)
{ {
ScriptTask::Ptr task = static_pointer_cast<ScriptTask>(atask);
service.RemoveTag("current_task"); service.RemoveTag("current_task");
/* if the service isn't in the set of pending services /* if the service isn't in the set of pending services

View File

@ -60,7 +60,7 @@ private:
void CheckTimerHandler(void); void CheckTimerHandler(void);
void ResultTimerHandler(void); void ResultTimerHandler(void);
void CheckCompletedHandler(Service service, const AsyncTask::Ptr& atask); void CheckCompletedHandler(Service service, const ScriptTask::Ptr& task);
void AdjustCheckTimer(void); void AdjustCheckTimer(void);

View File

@ -41,7 +41,7 @@ object host "localhost" {
abstract object service "nagios-service" { abstract object service "nagios-service" {
hooks = { hooks = {
check = "builtin::NagiosCheck" check = "native::NagiosCheck"
}, },
macros = { macros = {
@ -50,7 +50,6 @@ abstract object service "nagios-service" {
} }
abstract object service "ping" inherits "nagios-service" { abstract object service "ping" inherits "nagios-service" {
check_type = "nagios",
check_command = "$plugindir$/check_ping -H $address$", check_command = "$plugindir$/check_ping -H $address$",
check_interval = 30 check_interval = 30
} }

View File

@ -34,12 +34,6 @@ IcingaApplication::IcingaApplication(void)
: m_PidPath(DefaultPidPath) : m_PidPath(DefaultPidPath)
{ } { }
void TestScriptFunc(const ScriptTask::Ptr& task, const vector<Variant>& arguments)
{
std::cout << "Got " << arguments.size() << " arguments." << std::endl;
task->Finish();
}
/** /**
* The entry point for the Icinga application. * The entry point for the Icinga application.
* *