Refactor AsyncTasks.

This commit is contained in:
Gunnar Beutner 2013-03-25 17:36:15 +00:00
parent e739dfd88f
commit fd3e92ea0c
57 changed files with 240 additions and 613 deletions

View File

@ -143,19 +143,21 @@ void CheckerComponent::CheckThreadProc(void)
Log(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
try {
CheckerComponent::Ptr self = GetSelf();
service->BeginExecuteCheck(boost::bind(&CheckerComponent::CheckCompletedHandler, self, service));
} catch (const std::exception& ex) {
Log(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + boost::diagnostic_information(ex));
}
CheckerComponent::Ptr self = GetSelf();
Utility::QueueAsyncCallback(boost::bind(&CheckerComponent::ExecuteCheckHelper, self, service));
lock.lock();
}
}
void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
void CheckerComponent::ExecuteCheckHelper(const Service::Ptr& service)
{
try {
service->ExecuteCheck();
} catch (const std::exception& ex) {
Log(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + boost::diagnostic_information(ex));
}
boost::mutex::scoped_lock lock(m_Mutex);
/* remove the service from the list of pending services; if it's not in the

View File

@ -95,7 +95,7 @@ private:
void CheckThreadProc(void);
void ResultTimerHandler(void);
void CheckCompletedHandler(const Service::Ptr& service);
void ExecuteCheckHelper(const Service::Ptr& service);
void AdjustCheckTimer(void);

View File

@ -25,6 +25,7 @@
#include "base/logger_fwd.h"
#include "base/convert.h"
#include "base/application.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <fstream>

View File

@ -25,6 +25,7 @@
#include "base/dynamicobject.h"
#include "base/objectlock.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/thread/thread.hpp>
#include <iostream>

View File

@ -28,6 +28,8 @@
#include "base/exception.h"
#include "base/convert.h"
#include "base/application.h"
#include "base/utility.h"
#include "base/scriptfunction.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
@ -333,7 +335,7 @@ void CompatLog::RotationTimerHandler(void)
ScheduleNextRotation();
}
void CompatLog::ValidateRotationMethod(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value CompatLog::ValidateRotationMethod(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Missing argument: Location must be specified."));
@ -352,5 +354,5 @@ void CompatLog::ValidateRotationMethod(const ScriptTask::Ptr& task, const std::v
location + ": Rotation method '" + rotation_method + "' is invalid.");
}
task->FinishResult(Empty);
return Empty;
}

View File

@ -46,7 +46,7 @@ public:
String GetLogDir(void) const;
String GetRotationMethod(void) const;
static void ValidateRotationMethod(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value ValidateRotationMethod(const std::vector<Value>& arguments);
protected:
virtual void OnAttributeChanged(const String& name);

View File

@ -22,6 +22,7 @@
#include "base/dynamictype.h"
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -22,6 +22,7 @@
#include "base/application.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/program_options.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/smart_ptr/make_shared.hpp>
@ -145,6 +146,9 @@ int main(int argc, char **argv)
lt_dlinit();
#endif /* _WIN32 */
/* Set thread title. */
Utility::SetThreadName("Main Thread");
/* Set command-line arguments. */
Application::SetArgC(argc);
Application::SetArgV(argv);

View File

@ -12,7 +12,6 @@ libbase_la_SOURCES = \
application.h \
array.cpp \
array.h \
asynctask.h \
attribute.cpp \
attribute.h \
connection.cpp \
@ -25,8 +24,6 @@ libbase_la_SOURCES = \
dynamicobject.h \
dynamictype.cpp \
dynamictype.h \
eventqueue.cpp \
eventqueue.h \
exception.cpp \
exception.h \
fifo.cpp \
@ -58,8 +55,6 @@ libbase_la_SOURCES = \
scriptinterpreter.h \
scriptlanguage.cpp \
scriptlanguage.h \
scripttask.cpp \
scripttask.h \
singleton.h \
socket.cpp \
socket.h \
@ -77,6 +72,8 @@ libbase_la_SOURCES = \
sysloglogger.h \
tcpsocket.cpp \
tcpsocket.h \
threadpool.cpp \
threadpool.h \
timer.cpp \
timer.h \
tlsstream.cpp \

View File

@ -132,7 +132,7 @@ void Application::ShutdownTimerHandler(void)
Application::GetInstance()->OnShutdown();
DynamicObject::DeactivateObjects();
GetEQ().Stop();
GetTP().Stop();
m_ShuttingDown = false;
}
}
@ -155,7 +155,7 @@ void Application::RunEventLoop(void) const
Timer::Initialize();
GetEQ().Join();
GetTP().Join();
Timer::Uninitialize();
}
@ -614,12 +614,12 @@ void Application::SetPkgDataDir(const String& path)
}
/**
* Returns the main thread's event queue.
* Returns the global thread pool.
*
* @returns The event queue.
* @returns The global thread pool.
*/
EventQueue& Application::GetEQ(void)
ThreadPool& Application::GetTP(void)
{
static EventQueue queue;
return queue;
static ThreadPool tp;
return tp;
}

View File

@ -21,7 +21,7 @@
#define APPLICATION_H
#include "base/i2-base.h"
#include "base/eventqueue.h"
#include "base/threadpool.h"
#include "base/dynamicobject.h"
namespace icinga {
@ -82,7 +82,7 @@ public:
static String GetPkgDataDir(void);
static void SetPkgDataDir(const String& path);
static EventQueue& GetEQ(void);
static ThreadPool& GetTP(void);
protected:
void RunEventLoop(void) const;

View File

@ -1,198 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef ASYNCTASK_H
#define ASYNCTASK_H
#include "base/i2-base.h"
#include "base/object.h"
#include "base/utility.h"
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/bind.hpp>
#include <boost/exception_ptr.hpp>
namespace icinga
{
/**
* An asynchronous task.
*
* @ingroup base
*/
template<typename TClass, typename TResult>
class AsyncTask : public Object
{
public:
typedef shared_ptr<AsyncTask<TClass, TResult> > Ptr;
typedef weak_ptr<AsyncTask<TClass, TResult> > WeakPtr;
/**
* A completion callback for an AsyncTask.
*/
typedef boost::function<void (const shared_ptr<TClass>&)> CompletionCallback;
/**
* Constructor for the AsyncTask class.
*/
AsyncTask(void)
: m_Finished(false), m_ResultRetrieved(false)
{ }
/**
* Destructor for the AsyncTask class.
*/
~AsyncTask(void)
{
ASSERT(m_Finished);
ASSERT(m_ResultRetrieved);
}
/**
* Starts the async task. The caller must hold a reference to the AsyncTask
* object until the completion callback is invoked.
*/
void Start(const CompletionCallback& completionCallback = CompletionCallback())
{
ASSERT(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_CompletionCallback = completionCallback;
Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::RunInternal, this));
}
/**
* Checks whether the task is finished.
*/
bool IsFinished(void) const
{
ASSERT(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
return m_Finished;
}
/**
* Blocks until the task is completed and retrieves the result. Throws an exception if one is stored in
* the AsyncTask object.
*
* @returns The task's result.
*/
TResult GetResult(void)
{
ASSERT(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Finished)
m_CV.wait(lock);
if (m_ResultRetrieved)
BOOST_THROW_EXCEPTION(std::runtime_error("GetResult called on an AsyncTask whose result was already retrieved."));
m_ResultRetrieved = true;
if (m_Exception)
rethrow_exception(m_Exception);
TResult result;
std::swap(m_Result, result);
return result;
}
/**
* Finishes the task using an exception.
*
* @param ex The exception.
*/
void FinishException(const boost::exception_ptr& ex)
{
ASSERT(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_Exception = ex;
FinishInternal();
}
/**
* Finishes the task using an ordinary result.
*
* @param result The result.
*/
void FinishResult(const TResult& result)
{
ASSERT(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_Result = result;
FinishInternal();
}
protected:
/**
* Begins executing the task. The Run method must ensure
* that one of the Finish*() functions is executed on the task
* object (possibly after the Run method has returned).
*/
virtual void Run(void) = 0;
private:
/**
* Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed.
*/
void FinishInternal(void)
{
ASSERT(!m_Finished);
m_Finished = true;
m_CV.notify_all();
if (!m_CompletionCallback.empty()) {
CompletionCallback callback;
m_CompletionCallback.swap(callback);
Utility::QueueAsyncCallback(boost::bind(callback, GetSelf()));
}
}
/**
* Calls the Run() method and catches exceptions.
*/
void RunInternal(void)
{
try {
Run();
} catch (...) {
FinishException(boost::current_exception());
}
}
mutable boost::mutex m_Mutex;
boost::condition_variable m_CV;
CompletionCallback m_CompletionCallback; /**< The completion callback. */
TResult m_Result; /**< The task's result. */
boost::exception_ptr m_Exception; /**< The task's exception. */
bool m_Finished; /**< Whether the task is finished. */
bool m_ResultRetrieved; /**< Whether the result was retrieved. */
};
}
#endif /* ASYNCTASK_H */

View File

@ -27,7 +27,7 @@
<ClCompile Include="dynamicobject.cpp" />
<ClCompile Include="dictionary.cpp" />
<ClCompile Include="dynamictype.cpp" />
<ClCompile Include="eventqueue.cpp" />
<ClCompile Include="threadpool.cpp" />
<ClCompile Include="exception.cpp" />
<ClCompile Include="fifo.cpp" />
<ClCompile Include="logger.cpp" />
@ -40,7 +40,6 @@
<ClCompile Include="qstring.cpp" />
<ClCompile Include="ringbuffer.cpp" />
<ClCompile Include="scriptfunction.cpp" />
<ClCompile Include="scripttask.cpp" />
<ClCompile Include="socket.cpp" />
<ClCompile Include="stacktrace.cpp" />
<ClCompile Include="stdiostream.cpp" />
@ -59,14 +58,13 @@
<ItemGroup>
<ClInclude Include="application.h" />
<ClInclude Include="array.h" />
<ClInclude Include="asynctask.h" />
<ClInclude Include="attribute.h" />
<ClInclude Include="connection.h" />
<ClInclude Include="convert.h" />
<ClInclude Include="dynamicobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="dynamictype.h" />
<ClInclude Include="eventqueue.h" />
<ClInclude Include="threadpool.h" />
<ClInclude Include="fifo.h" />
<ClInclude Include="logger_fwd.h" />
<ClInclude Include="registry.h" />
@ -77,7 +75,6 @@
<ClInclude Include="netstring.h" />
<ClInclude Include="qstring.h" />
<ClInclude Include="scriptfunction.h" />
<ClInclude Include="scripttask.h" />
<ClInclude Include="logger.h" />
<ClInclude Include="exception.h" />
<ClInclude Include="i2-base.h" />
@ -256,4 +253,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -27,7 +27,7 @@
#include "base/logger_fwd.h"
#include "base/exception.h"
#include "base/timer.h"
#include "base/scripttask.h"
#include "base/scriptfunction.h"
#include <fstream>
#include <boost/make_shared.hpp>
#include <boost/foreach.hpp>
@ -410,7 +410,7 @@ void DynamicObject::Unregister(void)
dtype->UnregisterObject(GetSelf());
}
ScriptTask::Ptr DynamicObject::MakeMethodTask(const String& method,
Value DynamicObject::InvokeMethod(const String& method,
const std::vector<Value>& arguments)
{
Dictionary::Ptr methods;
@ -418,19 +418,19 @@ ScriptTask::Ptr DynamicObject::MakeMethodTask(const String& method,
methods = m_Methods;
if (!methods)
return ScriptTask::Ptr();
BOOST_THROW_EXCEPTION(std::invalid_argument("Method '" + method + "' does not exist."));
String funcName = methods->Get(method);
if (funcName.IsEmpty())
return ScriptTask::Ptr();
BOOST_THROW_EXCEPTION(std::invalid_argument("Method '" + method + "' does not exist."));
ScriptFunction::Ptr func = ScriptFunctionRegistry::GetInstance()->GetItem(funcName);
if (!func)
BOOST_THROW_EXCEPTION(std::invalid_argument("Function '" + funcName + "' does not exist."));
return boost::make_shared<ScriptTask>(func, arguments);
return func->Invoke(arguments);
}
void DynamicObject::DumpObjects(const String& filename)

View File

@ -22,7 +22,6 @@
#include "base/i2-base.h"
#include "base/attribute.h"
#include "base/scripttask.h"
#include "base/object.h"
#include "base/dictionary.h"
#include <boost/signals2.hpp>
@ -72,8 +71,7 @@ public:
static boost::signals2::signal<void (double, const std::set<DynamicObject::WeakPtr>&)> OnTransactionClosing;
static boost::signals2::signal<void (double, const DynamicObject::Ptr&)> OnFlushObject;
ScriptTask::Ptr MakeMethodTask(const String& method,
const std::vector<Value>& arguments);
Value InvokeMethod(const String& method, const std::vector<Value>& arguments);
shared_ptr<DynamicType> GetType(void) const;
String GetName(void) const;

View File

@ -22,6 +22,7 @@
#include "base/convert.h"
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/utility.h"
#include <map>
#include <boost/bind.hpp>
#include <boost/tuple/tuple.hpp>
@ -41,14 +42,12 @@ extern char **environ;
#define environ (*_NSGetEnviron())
#endif /* __APPLE__ */
void Process::Run(void)
ProcessResult Process::Run(void)
{
ProcessResult result;
result.ExecutionStart = Utility::GetTime();
ASSERT(m_FD == -1);
int fds[2];
#if HAVE_PIPE2
@ -195,7 +194,7 @@ void Process::Run(void)
result.ExitStatus = exitcode;
result.Output = output;
FinishResult(result);
return result;
}
#endif /* _WIN32 */

View File

@ -26,7 +26,7 @@
using namespace icinga;
Process::Process(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
: AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
: m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
{ }
std::vector<String> Process::SplitCommand(const Value& command)

View File

@ -22,7 +22,6 @@
#include "base/i2-base.h"
#include "base/timer.h"
#include "base/asynctask.h"
#include "base/dictionary.h"
#include <sstream>
#include <deque>
@ -52,7 +51,7 @@ struct ProcessResult
*
* @ingroup base
*/
class I2_BASE_API Process : public AsyncTask<Process, ProcessResult>
class I2_BASE_API Process : public Object
{
public:
typedef shared_ptr<Process> Ptr;
@ -62,6 +61,8 @@ public:
Process(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr());
ProcessResult Run(void);
static std::vector<String> SplitCommand(const Value& command);
private:
std::vector<String> m_Arguments;
@ -69,9 +70,8 @@ private:
#ifndef _WIN32
pid_t m_Pid;
#endif /* _WIN32 */
virtual void Run(void);
#endif /* _WIN32 */
};
}

View File

@ -17,9 +17,8 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/registry.h"
#include "base/scriptfunction.h"
#include "base/scripttask.h"
#include "base/registry.h"
#include <boost/smart_ptr/make_shared.hpp>
using namespace icinga;
@ -28,9 +27,9 @@ ScriptFunction::ScriptFunction(const Callback& function)
: m_Callback(function)
{ }
void ScriptFunction::Invoke(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value ScriptFunction::Invoke(const std::vector<Value>& arguments)
{
m_Callback(task, arguments);
return m_Callback(arguments);
}
RegisterFunctionHelper::RegisterFunctionHelper(const String& name, const ScriptFunction::Callback& function)

View File

@ -42,16 +42,14 @@ public:
typedef shared_ptr<ScriptFunction> Ptr;
typedef weak_ptr<ScriptFunction> WeakPtr;
typedef boost::function<void (const shared_ptr<ScriptTask>&, const std::vector<Value>& arguments)> Callback;
typedef boost::function<Value (const std::vector<Value>& arguments)> Callback;
explicit ScriptFunction(const Callback& function);
Value Invoke(const std::vector<Value>& arguments);
private:
Callback m_Callback;
void Invoke(const shared_ptr<ScriptTask>& task, const std::vector<Value>& arguments);
friend class ScriptTask;
};
/**

View File

@ -18,6 +18,7 @@
******************************************************************************/
#include "base/scriptinterpreter.h"
#include "base/scriptfunction.h"
#include "base/objectlock.h"
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
@ -41,7 +42,7 @@ void ScriptInterpreter::SubscribeFunction(const String& name)
m_SubscribedFunctions.insert(name);
ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2));
ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, name, _1));
ScriptFunctionRegistry::GetInstance()->Register(name, sf);
}

View File

@ -22,7 +22,6 @@
#include "base/i2-base.h"
#include "base/script.h"
#include "base/scripttask.h"
#include <vector>
#include <set>
@ -45,8 +44,7 @@ public:
protected:
ScriptInterpreter(const Script::Ptr& script);
virtual void ProcessCall(const ScriptTask::Ptr& task, const String& function,
const std::vector<Value>& arguments) = 0;
virtual Value ProcessCall(const String& function, const std::vector<Value>& arguments) = 0;
void SubscribeFunction(const String& name);
void UnsubscribeFunction(const String& name);

View File

@ -1,33 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/scripttask.h"
using namespace icinga;
ScriptTask::ScriptTask(const ScriptFunction::Ptr& function,
const std::vector<Value>& arguments)
: AsyncTask<ScriptTask, Value>(), m_Function(function),
m_Arguments(arguments)
{ }
void ScriptTask::Run(void)
{
m_Function->Invoke(GetSelf(), m_Arguments);
}

View File

@ -1,54 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef SCRIPTTASK_H
#define SCRIPTTASK_H
#include "base/i2-base.h"
#include "base/asynctask.h"
#include "base/scriptfunction.h"
#include <vector>
namespace icinga
{
/**
* A script task.
*
* @ingroup base
*/
class I2_BASE_API ScriptTask : public AsyncTask<ScriptTask, Value>
{
public:
typedef shared_ptr<ScriptTask> Ptr;
typedef weak_ptr<ScriptTask> WeakPtr;
ScriptTask(const ScriptFunction::Ptr& function, const std::vector<Value>& arguments);
protected:
virtual void Run(void);
private:
ScriptFunction::Ptr m_Function;
std::vector<Value> m_Arguments;
};
}
#endif /* SCRIPTTASK_H */

View File

@ -100,7 +100,7 @@ void StreamLogger::ProcessLogEntry(std::ostream& stream, bool tty, const LogEntr
}
}
stream << "[" << timestamp << "] <" << boost::this_thread::get_id() << "> "
stream << "[" << timestamp << "] <" << Utility::GetThreadName() << "> "
<< Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": "
<< entry.Message;

View File

@ -17,7 +17,7 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/eventqueue.h"
#include "base/threadpool.h"
#include "base/logger_fwd.h"
#include "base/convert.h"
#include "base/utility.h"
@ -29,7 +29,7 @@
using namespace icinga;
EventQueue::EventQueue(void)
ThreadPool::ThreadPool(void)
: m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0)
{
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++)
@ -38,17 +38,17 @@ EventQueue::EventQueue(void)
for (int i = 0; i < 2; i++)
SpawnWorker();
boost::thread managerThread(boost::bind(&EventQueue::ManagerThreadProc, this));
boost::thread managerThread(boost::bind(&ThreadPool::ManagerThreadProc, this));
managerThread.detach();
}
EventQueue::~EventQueue(void)
ThreadPool::~ThreadPool(void)
{
Stop();
Join();
}
void EventQueue::Stop(void)
void ThreadPool::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
@ -58,11 +58,11 @@ void EventQueue::Stop(void)
/**
* Waits for all worker threads to finish.
*/
void EventQueue::Join(void)
void ThreadPool::Join(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Stopped || !m_Events.empty()) {
while (!m_Stopped || !m_WorkItems.empty()) {
lock.unlock();
Utility::Sleep(0.5);
lock.lock();
@ -70,12 +70,16 @@ void EventQueue::Join(void)
}
/**
* Waits for events and processes them.
* Waits for work items and processes them.
*/
void EventQueue::QueueThreadProc(int tid)
void ThreadPool::QueueThreadProc(int tid)
{
std::ostringstream idbuf;
idbuf << "TP " << this << " Worker #" << tid;
Utility::SetThreadName(idbuf.str());
for (;;) {
EventQueueWorkItem event;
WorkItem wi;
double ws = Utility::GetTime();
double st;
@ -85,7 +89,7 @@ void EventQueue::QueueThreadProc(int tid)
m_ThreadStates[tid] = ThreadIdle;
while (m_Events.empty() && !m_Stopped && m_ThreadDeaths == 0)
while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0)
m_CV.wait(lock);
if (m_ThreadDeaths > 0) {
@ -93,11 +97,11 @@ void EventQueue::QueueThreadProc(int tid)
break;
}
if (m_Events.empty() && m_Stopped)
if (m_WorkItems.empty() && m_Stopped)
break;
event = m_Events.front();
m_Events.pop_front();
wi = m_WorkItems.front();
m_WorkItems.pop_front();
m_ThreadStates[tid] = ThreadBusy;
st = Utility::GetTime();
@ -113,7 +117,7 @@ void EventQueue::QueueThreadProc(int tid)
#endif /* _DEBUG */
try {
event.Callback();
wi.Callback();
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Exception thrown in event handler: " << std::endl
@ -125,7 +129,7 @@ void EventQueue::QueueThreadProc(int tid)
}
double et = Utility::GetTime();
double latency = st - event.Timestamp;
double latency = st - wi.Timestamp;
{
boost::mutex::scoped_lock lock(m_Mutex);
@ -175,27 +179,31 @@ void EventQueue::QueueThreadProc(int tid)
}
/**
* Appends an event to the event queue. Events will be processed in FIFO order.
* Appends a work item to the work queue. Work items will be processed in FIFO order.
*
* @param callback The callback function for the event.
* @param callback The callback function for the work item.
*/
void EventQueue::Post(const EventQueueCallback& callback)
void ThreadPool::Post(const ThreadPool::WorkFunction& callback)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (m_Stopped)
BOOST_THROW_EXCEPTION(std::runtime_error("EventQueue has been stopped."));
BOOST_THROW_EXCEPTION(std::runtime_error("ThreadPool has been stopped."));
EventQueueWorkItem event;
event.Callback = callback;
event.Timestamp = Utility::GetTime();
WorkItem wi;
wi.Callback = callback;
wi.Timestamp = Utility::GetTime();
m_Events.push_back(event);
m_WorkItems.push_back(wi);
m_CV.notify_one();
}
void EventQueue::ManagerThreadProc(void)
void ThreadPool::ManagerThreadProc(void)
{
std::ostringstream idbuf;
idbuf << "TP " << this << " Manager";
Utility::SetThreadName(idbuf.str());
for (;;) {
Utility::Sleep(5);
@ -203,46 +211,38 @@ void EventQueue::ManagerThreadProc(void)
int pending, alive;
double avg_latency, max_latency;
double utilization = 0;
{
boost::mutex::scoped_lock lock(m_Mutex);
pending = m_Events.size();
pending = m_WorkItems.size();
alive = 0;
double util = 0;
int hg = 0;
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
if (m_ThreadStates[i] != ThreadDead) {
alive++;
util += m_ThreadUtilization[i] * 100;
std::cout << (int)(m_ThreadUtilization[i] * 100) << "\t";
hg++;
if (hg % 25 == 0)
std::cout << std::endl;
utilization += m_ThreadUtilization[i] * 100;
}
}
util /= alive;
std::cout << std::endl;
utilization /= alive;
if (m_TaskCount > 0)
avg_latency = m_WaitTime / (m_TaskCount * 1.0);
else
avg_latency = 0;
std::cout << "Wait time: " << m_WaitTime << "; Service time: " << m_ServiceTime << "; tasks: " << m_TaskCount << std::endl;
std::cout << "Thread util: " << util << std::endl;
if (util < 60 || util > 80) {
int tthreads = ceil((util * alive) / 80.0) - alive;
if (utilization < 60 || utilization > 80) {
int tthreads = ceil((utilization * alive) / 80.0) - alive;
/* Don't ever kill the last 2 threads. */
if (alive + tthreads < 2)
tthreads = 2 - alive;
std::cout << "Target threads: " << tthreads << "; Alive: " << alive << std::endl;
/* Spawn more workers if there are outstanding work items. */
if (tthreads > 0 && pending > 0)
tthreads = 8;
for (int i = 0; i < -tthreads; i++)
KillWorker();
@ -260,8 +260,11 @@ void EventQueue::ManagerThreadProc(void)
}
std::ostringstream msgbuf;
msgbuf << "Pending tasks: " << pending << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
<< "; Max latency: " << (long)(max_latency * 1000) << "ms";
msgbuf << "Pending tasks: " << pending << "; Average latency: "
<< (long)(avg_latency * 1000) << "ms"
<< "; Max latency: " << (long)(max_latency * 1000) << "ms"
<< "; Threads: " << alive
<< "; Pool utilization: " << utilization << "%";
Log(LogInformation, "base", msgbuf.str());
}
}
@ -269,7 +272,7 @@ void EventQueue::ManagerThreadProc(void)
/**
* Note: Caller must hold m_Mutex
*/
void EventQueue::SpawnWorker(void)
void ThreadPool::SpawnWorker(void)
{
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
if (m_ThreadStates[i] == ThreadDead) {
@ -277,7 +280,7 @@ void EventQueue::SpawnWorker(void)
m_ThreadStates[i] = ThreadIdle;
m_ThreadUtilization[i] = 0;
boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
boost::thread worker(boost::bind(&ThreadPool::QueueThreadProc, this, i));
worker.detach();
break;
@ -288,7 +291,7 @@ void EventQueue::SpawnWorker(void)
/**
* Note: Caller must hold m_Mutex.
*/
void EventQueue::KillWorker(void)
void ThreadPool::KillWorker(void)
{
Log(LogDebug, "base", "Killing worker thread.");
@ -298,7 +301,7 @@ void EventQueue::KillWorker(void)
/**
* Note: Caller must hold m_Mutex.
*/
void EventQueue::UpdateThreadUtilization(int tid, double time, double utilization)
void ThreadPool::UpdateThreadUtilization(int tid, double time, double utilization)
{
const double avg_time = 5.0;

View File

@ -17,8 +17,8 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef EVENTQUEUE_H
#define EVENTQUEUE_H
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include "base/i2-base.h"
#include <stack>
@ -30,38 +30,32 @@
namespace icinga
{
enum ThreadState
{
ThreadDead,
ThreadIdle,
ThreadBusy
};
typedef boost::function<void ()> EventQueueCallback;
struct EventQueueWorkItem
{
EventQueueCallback Callback;
double Timestamp;
};
/**
* An event queue.
* A thread pool.
*
* @ingroup base
*/
class I2_BASE_API EventQueue
class I2_BASE_API ThreadPool
{
public:
EventQueue(void);
~EventQueue(void);
typedef boost::function<void ()> WorkFunction;
ThreadPool(void);
~ThreadPool(void);
void Stop(void);
void Join(void);
void Post(const EventQueueCallback& callback);
void Post(const WorkFunction& callback);
private:
enum ThreadState
{
ThreadDead,
ThreadIdle,
ThreadBusy
};
ThreadState m_ThreadStates[512];
double m_ThreadUtilization[512];
int m_ThreadDeaths;
@ -76,7 +70,15 @@ private:
boost::condition_variable m_CV;
bool m_Stopped;
std::deque<EventQueueWorkItem> m_Events;
struct WorkItem
{
WorkFunction Callback;
double Timestamp;
};
std::deque<WorkItem> m_WorkItems;
void QueueThreadProc(int tid);
void ManagerThreadProc(void);

View File

@ -300,6 +300,6 @@ void Timer::TimerThreadProc(void)
lock.unlock();
/* Asynchronously call the timer. */
Application::GetEQ().Post(boost::bind(&Timer::Call, timer));
Application::GetTP().Post(boost::bind(&Timer::Call, timer));
}
}

View File

@ -35,6 +35,8 @@
using namespace icinga;
boost::thread_specific_ptr<String> Utility::m_ThreadName;
/**
* Demangles a symbol name.
*
@ -400,7 +402,7 @@ void Utility::SetNonBlockingSocket(SOCKET s)
void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
{
Application::GetEQ().Post(callback);
Application::GetTP().Post(callback);
}
String Utility::FormatDateTime(const char *format, double ts)
@ -477,3 +479,21 @@ String Utility::EscapeShellCmd(const String& s)
return result;
}
void Utility::SetThreadName(const String& name)
{
m_ThreadName.reset(new String(name));
}
String Utility::GetThreadName(void)
{
String *name = m_ThreadName.get();
if (!name) {
std::ostringstream idbuf;
idbuf << boost::this_thread::get_id();
return idbuf.str();
}
return *name;
}

View File

@ -24,6 +24,7 @@
#include "base/qstring.h"
#include <typeinfo>
#include <boost/function.hpp>
#include <boost/thread/tss.hpp>
namespace icinga
{
@ -77,8 +78,13 @@ public:
static String EscapeShellCmd(const String& s);
static void SetThreadName(const String& name);
static String GetThreadName(void);
private:
Utility(void);
static boost::thread_specific_ptr<String> m_ThreadName;
};
}

View File

@ -21,7 +21,7 @@
#include "config/configcompilercontext.h"
#include "base/objectlock.h"
#include "base/convert.h"
#include "base/scripttask.h"
#include "base/scriptfunction.h"
#include <boost/tuple/tuple.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
@ -133,9 +133,7 @@ void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
arguments.push_back(LocationToString(locations));
arguments.push_back(dictionary);
ScriptTask::Ptr task = boost::make_shared<ScriptTask>(func, arguments);
task->Start();
task->GetResult();
func->Invoke(arguments);
}
}
@ -211,9 +209,7 @@ void ConfigType::ValidateArray(const Array::Ptr& array,
arguments.push_back(LocationToString(locations));
arguments.push_back(array);
ScriptTask::Ptr task = boost::make_shared<ScriptTask>(func, arguments);
task->Start();
task->GetResult();
func->Invoke(arguments);
}
}

View File

@ -25,7 +25,7 @@ using namespace icinga;
REGISTER_SCRIPTFUNCTION(GetAnswerToEverything, &API::GetAnswerToEverything);
void API::GetAnswerToEverything(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value API::GetAnswerToEverything(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Text argument required."));
@ -34,5 +34,5 @@ void API::GetAnswerToEverything(const ScriptTask::Ptr& task, const std::vector<V
Log(LogInformation, "icinga", "Hello from the Icinga 2 API: " + text);
task->FinishResult(42);
return 42;
}

View File

@ -21,7 +21,8 @@
#define API_H
#include "icinga/i2-icinga.h"
#include "base/scripttask.h"
#include "base/value.h"
#include <vector>
namespace icinga
{
@ -34,7 +35,7 @@ namespace icinga
class I2_ICINGA_API API
{
public:
static void GetAnswerToEverything(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value GetAnswerToEverything(const std::vector<Value>& arguments);
private:
API(void);

View File

@ -28,6 +28,7 @@
#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/application.h"
#include "base/utility.h"
#include <fstream>
#include <boost/algorithm/string/classification.hpp>
#include <boost/foreach.hpp>

View File

@ -25,6 +25,8 @@
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/convert.h"
#include "base/scriptfunction.h"
#include "base/utility.h"
#include "config/configitembuilder.h"
#include "config/configcompilercontext.h"
#include <boost/tuple/tuple.hpp>
@ -350,7 +352,7 @@ void Host::RefreshServicesCache(void)
l_ServicesCache.swap(newServicesCache);
}
void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value Host::ValidateServiceDictionary(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Missing argument: Location must be specified."));
@ -402,7 +404,7 @@ void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const std::vec
}
}
task->FinishResult(Empty);
return Empty;
}
Service::Ptr Host::GetServiceByShortName(const Value& name) const

View File

@ -103,8 +103,7 @@ public:
std::set<shared_ptr<Service> > GetServices(void) const;
static void InvalidateServicesCache(void);
static void ValidateServiceDictionary(const ScriptTask::Ptr& task,
const std::vector<icinga::Value>& arguments);
static Value ValidateServiceDictionary(const std::vector<icinga::Value>& arguments);
static HostState CalculateState(ServiceState state, bool reachable);

View File

@ -21,6 +21,7 @@
#include "base/dynamictype.h"
#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/utility.h"
#include "base/timer.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -23,6 +23,7 @@
#include "base/dynamictype.h"
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/utility.h"
#include <boost/tuple/tuple.hpp>
#include <boost/foreach.hpp>
#include <boost/exception/diagnostic_information.hpp>
@ -231,11 +232,11 @@ void Notification::BeginExecuteNotification(NotificationType type, const Diction
BOOST_FOREACH(const User::Ptr& user, allUsers) {
Log(LogDebug, "icinga", "Sending notification for user " + user->GetName());
BeginExecuteNotificationHelper(type, user, cr, ignore_timeperiod);
Utility::QueueAsyncCallback(boost::bind(&Notification::ExecuteNotificationHelper, this, type, user, cr, ignore_timeperiod));
}
}
void Notification::BeginExecuteNotificationHelper(NotificationType type, const User::Ptr& user, const Dictionary::Ptr& cr, bool ignore_timeperiod)
void Notification::ExecuteNotificationHelper(NotificationType type, const User::Ptr& user, const Dictionary::Ptr& cr, bool ignore_timeperiod)
{
ASSERT(!OwnsLock());
@ -257,36 +258,8 @@ void Notification::BeginExecuteNotificationHelper(NotificationType type, const U
arguments.push_back(cr);
arguments.push_back(type);
ScriptTask::Ptr task = MakeMethodTask("notify", arguments);
if (!task) {
Log(LogWarning, "icinga", "Notification object '" + GetName() + "' doesn't have a 'notify' method.");
return;
}
{
ObjectLock olock(this);
/* We need to keep the task object alive until the completion handler is called. */
m_Tasks.insert(task);
}
task->Start(boost::bind(&Notification::NotificationCompletedHandler, self, _1));
}
void Notification::NotificationCompletedHandler(const ScriptTask::Ptr& task)
{
ASSERT(!OwnsLock());
{
ObjectLock olock(this);
m_Tasks.erase(task);
}
try {
task->GetResult();
InvokeMethod("notify", arguments);
Log(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'");
} catch (const std::exception& ex) {

View File

@ -100,11 +100,7 @@ private:
Attribute<String> m_HostName;
Attribute<String> m_Service;
std::set<ScriptTask::Ptr> m_Tasks;
void NotificationCompletedHandler(const ScriptTask::Ptr& task);
void BeginExecuteNotificationHelper(NotificationType type, const User::Ptr& user, const Dictionary::Ptr& cr, bool ignore_timeperiod);
void ExecuteNotificationHelper(NotificationType type, const User::Ptr& user, const Dictionary::Ptr& cr, bool ignore_timeperiod);
};
}

View File

@ -20,13 +20,14 @@
#include "icinga/nullchecktask.h"
#include "icinga/service.h"
#include "base/dictionary.h"
#include "base/scriptfunction.h"
#include <boost/smart_ptr/make_shared.hpp>
using namespace icinga;
REGISTER_SCRIPTFUNCTION(NullCheck, &NullCheckTask::ScriptFunc);
REGISTER_SCRIPTFUNCTION(NullCheck, &NullCheckTask::ScriptFunc);
void NullCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value NullCheckTask::ScriptFunc(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Missing argument: Service must be specified."));
@ -34,5 +35,5 @@ void NullCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Va
Dictionary::Ptr cr = boost::make_shared<Dictionary>();
cr->Set("state", StateUnknown);
task->FinishResult(cr);
return cr;
}

View File

@ -21,7 +21,7 @@
#define NULLCHECKTASK_H
#include "icinga/i2-icinga.h"
#include "base/scripttask.h"
#include "base/value.h"
namespace icinga
{
@ -34,7 +34,7 @@ namespace icinga
class I2_ICINGA_API NullCheckTask
{
public:
static void ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value ScriptFunc(const std::vector<Value>& arguments);
private:
NullCheckTask(void);

View File

@ -26,6 +26,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/convert.h"
#include "base/utility.h"
#include "base/application.h"
#include <boost/smart_ptr/make_shared.hpp>

View File

@ -22,6 +22,8 @@
#include "icinga/icingaapplication.h"
#include "base/dynamictype.h"
#include "base/logger_fwd.h"
#include "base/scriptfunction.h"
#include "base/utility.h"
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/smart_ptr/make_shared.hpp>
@ -31,11 +33,7 @@ using namespace icinga;
REGISTER_SCRIPTFUNCTION(PluginCheck, &PluginCheckTask::ScriptFunc);
PluginCheckTask::PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr& process, const Value& command)
: m_Task(task), m_Process(process), m_Command(command)
{ }
void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value PluginCheckTask::ScriptFunc(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Missing argument: Service must be specified."));
@ -70,32 +68,17 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const std::vector<
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), envMacros);
PluginCheckTask ct(task, process, command);
process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
}
void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
{
ProcessResult pr;
try {
pr = ct.m_Process->GetResult();
} catch (...) {
ct.m_Task->FinishException(boost::current_exception());
return;
}
ProcessResult pr = process->Run();
String output = pr.Output;
output.Trim();
Dictionary::Ptr result = ParseCheckOutput(output);
result->Set("command", ct.m_Command);
result->Set("command", command);
result->Set("state", ExitStatusToState(pr.ExitStatus));
result->Set("execution_start", pr.ExecutionStart);
result->Set("execution_end", pr.ExecutionEnd);
ct.m_Task->FinishResult(result);
return result;
}
ServiceState PluginCheckTask::ExitStatusToState(int exitStatus)

View File

@ -22,7 +22,6 @@
#include "icinga/i2-icinga.h"
#include "icinga/service.h"
#include "base/scripttask.h"
#include "base/process.h"
namespace icinga
@ -36,19 +35,13 @@ namespace icinga
class I2_ICINGA_API PluginCheckTask
{
public:
static void ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value ScriptFunc(const std::vector<Value>& arguments);
static ServiceState ExitStatusToState(int exitStatus);
static Dictionary::Ptr ParseCheckOutput(const String& output);
private:
static void ProcessFinishedHandler(PluginCheckTask ct);
PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr& process, const Value& command);
ScriptTask::Ptr m_Task;
Process::Ptr m_Process;
Value m_Command;
PluginCheckTask(void);
};
}

View File

@ -24,6 +24,8 @@
#include "icinga/icingaapplication.h"
#include "base/scriptfunction.h"
#include "base/logger_fwd.h"
#include "base/utility.h"
#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
@ -31,12 +33,7 @@ using namespace icinga;
REGISTER_SCRIPTFUNCTION(PluginNotification, &PluginNotificationTask::ScriptFunc);
PluginNotificationTask::PluginNotificationTask(const ScriptTask::Ptr& task, const Process::Ptr& process,
const String& service, const String& command)
: m_Task(task), m_Process(process), m_ServiceName(service), m_Command(command)
{ }
void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value PluginNotificationTask::ScriptFunc(const std::vector<Value>& arguments)
{
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(std::invalid_argument("Missing argument: Notification target must be specified."));
@ -55,13 +52,9 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const std::
Dictionary::Ptr cr = arguments[2];
NotificationType type = static_cast<NotificationType>(static_cast<int>(arguments[3]));
Value raw_command = notification->GetNotificationCommand();
String service_name;
Service::Ptr service = notification->GetService();
if (service)
service_name = service->GetName();
Value raw_command = notification->GetNotificationCommand();
StaticMacroResolver::Ptr notificationMacroResolver = boost::make_shared<StaticMacroResolver>();
notificationMacroResolver->Add("NOTIFICATIONTYPE", Notification::NotificationTypeToString(type));
@ -95,30 +88,15 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const std::
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), envMacros);
PluginNotificationTask ct(task, process, service_name, command);
ProcessResult pr = process->Run();
process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
}
void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
{
ProcessResult pr;
try {
pr = ct.m_Process->GetResult();
if (pr.ExitStatus != 0) {
std::ostringstream msgbuf;
msgbuf << "Notification command '" << ct.m_Command << "' for service '"
<< ct.m_ServiceName << "' failed; exit status: "
<< pr.ExitStatus << ", output: " << pr.Output;
Log(LogWarning, "icinga", msgbuf.str());
}
ct.m_Task->FinishResult(Empty);
} catch (...) {
ct.m_Task->FinishException(boost::current_exception());
return;
if (pr.ExitStatus != 0) {
std::ostringstream msgbuf;
msgbuf << "Notification command '" << Convert::ToString(command) << "' for service '"
<< service->GetName() << "' failed; exit status: "
<< pr.ExitStatus << ", output: " << pr.Output;
Log(LogWarning, "icinga", msgbuf.str());
}
return Empty;
}

View File

@ -21,7 +21,6 @@
#define PLUGINNOTIFICATIONTASK_H
#include "icinga/i2-icinga.h"
#include "base/scripttask.h"
#include "base/process.h"
namespace icinga
@ -35,19 +34,10 @@ namespace icinga
class I2_ICINGA_API PluginNotificationTask
{
public:
static void ScriptFunc(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value ScriptFunc(const std::vector<Value>& arguments);
private:
static void ProcessFinishedHandler(PluginNotificationTask ct);
PluginNotificationTask(const ScriptTask::Ptr& task, const Process::Ptr& process,
const String& service, const String& command);
ScriptTask::Ptr m_Task;
Process::Ptr m_Process;
String m_ServiceName;
String m_Command;
PluginNotificationTask(void);
};
}

View File

@ -332,7 +332,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
long old_attempt = GetCurrentCheckAttempt();
bool recovery;
/* The BeginExecuteCheck function already sets the old state, but we need to do it again
/* The ExecuteCheck function already sets the old state, but we need to do it again
* in case this was a passive check result. */
SetLastState(old_state);
SetLastStateType(old_stateType);
@ -522,7 +522,7 @@ bool Service::IsAllowedChecker(const String& checker) const
return false;
}
void Service::BeginExecuteCheck(const boost::function<void (void)>& callback)
void Service::ExecuteCheck(void)
{
ASSERT(!OwnsLock());
@ -532,14 +532,8 @@ void Service::BeginExecuteCheck(const boost::function<void (void)>& callback)
ObjectLock olock(this);
/* don't run another check if there is one pending */
if (m_CheckRunning) {
olock.Unlock();
/* we need to call the callback anyway */
callback();
if (m_CheckRunning)
return;
}
m_CheckRunning = true;
@ -558,32 +552,10 @@ void Service::BeginExecuteCheck(const boost::function<void (void)>& callback)
std::vector<Value> arguments;
arguments.push_back(self);
ScriptTask::Ptr task = MakeMethodTask("check", arguments);
{
ObjectLock olock(this);
self->m_CurrentTask = task;
}
task->Start(boost::bind(&Service::CheckCompletedHandler, self, checkInfo, _1, callback));
}
void Service::CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
const ScriptTask::Ptr& task, const boost::function<void (void)>& callback)
{
ASSERT(!OwnsLock());
checkInfo->Set("execution_end", Utility::GetTime());
checkInfo->Set("schedule_end", Utility::GetTime());
checkInfo->Seal();
Dictionary::Ptr result;
try {
Value vresult = task->GetResult();
if (vresult.IsObjectType<Dictionary>())
result = vresult;
result = InvokeMethod("check", arguments);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Exception occured during check for service '"
@ -597,6 +569,10 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
result->Set("output", message);
}
checkInfo->Set("execution_end", Utility::GetTime());
checkInfo->Set("schedule_end", Utility::GetTime());
checkInfo->Seal();
if (result) {
if (!result->Contains("schedule_start"))
result->Set("schedule_start", checkInfo->Get("schedule_start"));
@ -630,11 +606,8 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
{
ObjectLock olock(this);
m_CurrentTask.reset();
m_CheckRunning = false;
}
callback();
}
void Service::UpdateStatistics(const Dictionary::Ptr& cr)

View File

@ -22,6 +22,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
#include <boost/tuple/tuple.hpp>

View File

@ -22,6 +22,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/tuple/tuple.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -25,6 +25,7 @@
#include "base/dynamictype.h"
#include "base/objectlock.h"
#include "base/convert.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -163,7 +163,7 @@ public:
void AcknowledgeProblem(AcknowledgementType type, double expiry = 0);
void ClearAcknowledgement(void);
void BeginExecuteCheck(const boost::function<void (void)>& callback);
void ExecuteCheck(void);
void ProcessCheckResult(const Dictionary::Ptr& cr);
static double CalculateExecutionTime(const Dictionary::Ptr& cr);
@ -282,13 +282,9 @@ private:
Attribute<bool> m_EnablePassiveChecks;
Attribute<bool> m_ForceNextCheck;
ScriptTask::Ptr m_CurrentTask;
bool m_CheckRunning;
long m_SchedulingOffset;
void CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
const ScriptTask::Ptr& task, const boost::function<void (void)>& callback);
/* Downtimes */
Attribute<Dictionary::Ptr> m_Downtimes;

View File

@ -23,6 +23,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -24,6 +24,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
@ -209,16 +210,8 @@ void TimePeriod::UpdateRegion(double begin, double end)
arguments.push_back(self);
arguments.push_back(begin);
arguments.push_back(end);
ScriptTask::Ptr task = MakeMethodTask("update", arguments);
if (!task) {
Log(LogWarning, "icinga", "TimePeriod object '" + GetName() + "' doesn't have an 'update' method.");
return;
}
task->Start();
Array::Ptr segments = task->GetResult();
Array::Ptr segments = InvokeMethod("update", arguments);
{
ObjectLock olock(this);
@ -298,7 +291,7 @@ void TimePeriod::UpdateTimerHandler(void)
}
}
void TimePeriod::EmptyTimePeriodUpdate(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value TimePeriod::EmptyTimePeriodUpdate(const std::vector<Value>& arguments)
{
if (arguments.size() < 3)
BOOST_THROW_EXCEPTION(std::runtime_error("Expected 3 arguments."));
@ -308,10 +301,10 @@ void TimePeriod::EmptyTimePeriodUpdate(const ScriptTask::Ptr& task, const std::v
// double end = arguments[2];
Array::Ptr segments = boost::make_shared<Array>();
task->FinishResult(segments);
return segments;
}
void TimePeriod::EvenMinutesTimePeriodUpdate(const ScriptTask::Ptr& task, const std::vector<Value>& arguments)
Value TimePeriod::EvenMinutesTimePeriodUpdate(const std::vector<Value>& arguments)
{
if (arguments.size() < 3)
BOOST_THROW_EXCEPTION(std::runtime_error("Expected 3 arguments."));
@ -332,5 +325,5 @@ void TimePeriod::EvenMinutesTimePeriodUpdate(const ScriptTask::Ptr& task, const
}
}
task->FinishResult(segments);
return segments;
}

View File

@ -49,8 +49,8 @@ public:
bool IsInside(double ts) const;
double FindNextTransition(double begin);
static void EmptyTimePeriodUpdate(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static void EvenMinutesTimePeriodUpdate(const ScriptTask::Ptr& task, const std::vector<Value>& arguments);
static Value EmptyTimePeriodUpdate(const std::vector<Value>& arguments);
static Value EvenMinutesTimePeriodUpdate(const std::vector<Value>& arguments);
private:
Attribute<double> m_ValidBegin;

View File

@ -20,6 +20,7 @@
#include "icinga/user.h"
#include "icinga/usergroup.h"
#include "base/dynamictype.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
using namespace icinga;

View File

@ -22,6 +22,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/timer.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>

View File

@ -259,7 +259,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
if (it == m_TopicHandlers.end())
return;
Application::GetEQ().Post(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
GetClient()->SendMessage(request);
}