1
0
mirror of https://github.com/Icinga/icinga2.git synced 2025-04-07 20:25:08 +02:00

Fine-grained locks (WIP, Part 1).

This commit is contained in:
Gunnar Beutner 2013-02-17 19:14:34 +01:00
parent 8ca8486d5b
commit c63c28dd9c
71 changed files with 869 additions and 682 deletions

@ -53,27 +53,44 @@ void CheckerComponent::Stop(void)
void CheckerComponent::CheckTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
double now = Utility::GetTime();
long tasks = 0;
int missedServices = 0, missedChecks = 0;
while (!m_IdleServices.empty()) {
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
for (;;) {
Service::Ptr service;
CheckTimeView::iterator it = idx.begin();
Service::Ptr service = it->lock();
{
boost::mutex::scoped_lock lock(m_Mutex);
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
if (idx.begin() == idx.end())
break;
CheckTimeView::iterator it = idx.begin();
service = it->lock();
if (!service) {
idx.erase(it);
continue;
}
{
ObjectLock olock(service);
if (service->GetNextCheck() > now)
break;
}
if (!service) {
idx.erase(it);
continue;
}
if (service->GetNextCheck() > now)
break;
idx.erase(it);
ObjectLock olock(service);
/* reschedule the service if checks are currently disabled
* for it and this is not a forced check */
@ -83,7 +100,14 @@ void CheckerComponent::CheckTimerHandler(void)
service->UpdateNextCheck();
idx.insert(service);
{
boost::mutex::scoped_lock lock(m_Mutex);
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
idx.insert(service);
}
continue;
}
@ -136,19 +160,26 @@ void CheckerComponent::CheckTimerHandler(void)
void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
{
/* remove the service from the list of pending services; if it's not in the
* list this was a manual (i.e. forced) check and we must not re-add the
* service to the services list because it's already there. */
CheckerComponent::ServiceSet::iterator it;
it = m_PendingServices.find(service);
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
m_IdleServices.insert(service);
{
boost::mutex::scoped_lock lock(m_Mutex);
/* remove the service from the list of pending services; if it's not in the
* list this was a manual (i.e. forced) check and we must not re-add the
* service to the services list because it's already there. */
CheckerComponent::ServiceSet::iterator it;
it = m_PendingServices.find(service);
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
m_IdleServices.insert(service);
}
}
RescheduleCheckTimer();
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
{
ObjectLock olock(service);
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
}
}
void CheckerComponent::ResultTimerHandler(void)
@ -156,20 +187,35 @@ void CheckerComponent::ResultTimerHandler(void)
Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
stringstream msgbuf;
msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
{
boost::mutex::scoped_lock lock(m_Mutex);
msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
}
Logger::Write(LogInformation, "checker", msgbuf.str());
}
void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
String checker = service->GetChecker();
String checker;
{
ObjectLock olock(service);
checker = service->GetChecker();
}
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
boost::mutex::scoped_lock lock(m_Mutex);
if (m_PendingServices.find(service) != m_PendingServices.end())
return;
m_IdleServices.insert(service);
} else {
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.erase(service);
}
@ -177,16 +223,20 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
{
/* remove and re-insert the service from the set in order to force an index update */
typedef nth_index<ServiceSet, 0>::type ServiceView;
ServiceView& idx = boost::get<0>(m_IdleServices);
{
boost::mutex::scoped_lock lock(m_Mutex);
ServiceView::iterator it = idx.find(service);
if (it == idx.end())
return;
/* remove and re-insert the service from the set in order to force an index update */
typedef nth_index<ServiceSet, 0>::type ServiceView;
ServiceView& idx = boost::get<0>(m_IdleServices);
idx.erase(it);
idx.insert(service);
ServiceView::iterator it = idx.find(service);
if (it == idx.end())
return;
idx.erase(it);
idx.insert(service);
}
RescheduleCheckTimer();
}
@ -199,31 +249,40 @@ void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
if (!service)
return;
m_IdleServices.erase(service);
m_PendingServices.erase(service);
{
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.erase(service);
}
}
void CheckerComponent::RescheduleCheckTimer(void)
{
if (m_IdleServices.empty())
return;
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
Service::Ptr service;
do {
CheckTimeView::iterator it = idx.begin();
{
boost::mutex::scoped_lock lock(m_Mutex);
if (it == idx.end())
if (m_IdleServices.empty())
return;
service = it->lock();
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
if (!service)
idx.erase(it);
} while (!service);
do {
CheckTimeView::iterator it = idx.begin();
if (it == idx.end())
return;
service = it->lock();
if (!service)
idx.erase(it);
} while (!service);
}
ObjectLock olock(service);
m_CheckTimer->Reschedule(service->GetNextCheck());
}

@ -37,7 +37,10 @@ struct ServiceNextCheckExtractor
if (!service)
return 0;
return service->GetNextCheck();
{
ObjectLock olock(service);
return service->GetNextCheck();
}
}
};
@ -64,6 +67,8 @@ public:
private:
Endpoint::Ptr m_Endpoint;
boost::mutex m_Mutex;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;

@ -29,17 +29,6 @@
#include <i2-base.h>
#include <i2-icinga.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
using boost::multi_index_container;
using boost::multi_index::indexed_by;
using boost::multi_index::identity;
using boost::multi_index::ordered_unique;
using boost::multi_index::ordered_non_unique;
using boost::multi_index::nth_index;
#include "checkercomponent.h"
#endif /* I2CHECKER_H */

@ -124,7 +124,6 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
}
}
if (!fifo_ok && mkfifo(commandPath.CStr(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) < 0)
BOOST_THROW_EXCEPTION(PosixException("mkfifo() failed", errno));
@ -154,7 +153,12 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
line[strlen(line) - 1] = '\0';
String command = line;
Application::GetEQ().Post(boost::bind(&CompatComponent::ProcessCommand, this, command));
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessCommand(command);
}
}
fclose(fp);
@ -412,6 +416,8 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
*/
void CompatComponent::StatusTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "compat", "Writing compat status information");
String statuspath = GetStatusPath();

@ -82,6 +82,8 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
void DelegationComponent::DelegationTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
map<Endpoint::Ptr, int> histogram;
DynamicObject::Ptr object;

@ -54,6 +54,8 @@ void DemoComponent::Stop(void)
*/
void DemoComponent::DemoTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
" world' message.");

@ -53,6 +53,8 @@ void NotificationComponent::Stop(void)
*/
void NotificationComponent::NotificationTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
// TODO: implement
}

@ -32,7 +32,7 @@ void ReplicationComponent::Start(void)
DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2));
Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));

@ -66,23 +66,6 @@ static bool LoadConfigFiles(bool validateOnly)
if (hasError)
return false;
/* Logger::Write(LogInformation, "icinga-app", "Validating config items...");
DynamicType::Ptr type;
BOOST_FOREACH(tie(tuples::ignore, type), DynamicType::GetTypes()) {
ConfigType::Ptr ctype = ConfigType::GetByName(type->GetName());
if (!ctype) {
Logger::Write(LogWarning, "icinga-app", "No config type found for type '" + type->GetName() + "'");
continue;
}
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), type->GetObjects()) {
ctype->ValidateObject(object);
}
}*/
if (validateOnly)
return true;
@ -105,8 +88,13 @@ static bool LoadConfigFiles(bool validateOnly)
static void ReloadConfigTimerHandler(void)
{
if (g_ReloadConfig) {
Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
LoadConfigFiles(false);
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
LoadConfigFiles(false);
}
g_ReloadConfig = false;
}
}
@ -136,10 +124,6 @@ int main(int argc, char **argv)
lt_dlinit();
#endif /* _WIN32 */
/* This must be done before calling any other functions
* in the base library. */
Application::SetMainThread();
/* Set command-line arguments. */
Application::SetArgC(argc);
Application::SetArgV(argv);
@ -252,14 +236,14 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
DynamicObject::BeginTx();
DynamicObject::NewTx();
bool validateOnly = g_AppParams.count("validate");
if (!LoadConfigFiles(validateOnly))
return EXIT_FAILURE;
DynamicObject::FinishTx();
DynamicObject::NewTx();
if (validateOnly) {
Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate.");
@ -290,4 +274,3 @@ int main(int argc, char **argv)
return app->Run();
}

@ -21,7 +21,7 @@
using namespace icinga;
boost::mutex Application::m_Mutex;
recursive_mutex Application::m_Mutex;
Application *Application::m_Instance = NULL;
bool Application::m_ShuttingDown = false;
bool Application::m_Debugging = false;
@ -110,25 +110,26 @@ void Application::SetArgV(char **argv)
m_ArgV = argv;
}
/**
* Runs one iteration of the event loop.
*
* @returns false if we're shutting down, true otherwise.
*/
bool Application::ProcessEvents(void)
void Application::NewTxTimerHandler(void)
{
Object::ClearHeldObjects();
DynamicObject::NewTx();
}
double sleep = Timer::ProcessTimers();
#ifdef _DEBUG
void Application::ProfileTimerHandler(void)
{
stringstream msgbuf;
msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
Logger::Write(LogInformation, "base", msgbuf.str());
Object::PrintMemoryProfile();
}
#endif /* _DEBUG */
void Application::ShutdownTimerHandler(void)
{
if (m_ShuttingDown)
return false;
GetEQ().ProcessEvents(m_Mutex, boost::posix_time::milliseconds(sleep * 1000));
DynamicObject::FlushTx();
return true;
m_EQ.Stop();
}
/**
@ -137,32 +138,31 @@ bool Application::ProcessEvents(void)
*/
void Application::RunEventLoop(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
#ifdef _DEBUG
double nextProfile = 0;
#endif /* _DEBUG */
/* Start the system time watch thread. */
thread t(&Application::TimeWatchThreadProc);
t.detach();
while (!m_ShuttingDown) {
if (!ProcessEvents())
break;
/* Set up a timer to periodically flush the tx. */
Timer::Ptr newTxTimer = boost::make_shared<Timer>();
newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
newTxTimer->SetInterval(0.5);
newTxTimer->Start();
/* Set up a timer that watches the m_Shutdown flag. */
Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
shutdownTimer->SetInterval(0.5);
shutdownTimer->Start();
#ifdef _DEBUG
if (nextProfile < Utility::GetTime()) {
stringstream msgbuf;
msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
Logger::Write(LogInformation, "base", msgbuf.str());
Object::PrintMemoryProfile();
nextProfile = Utility::GetTime() + 15.0;
}
/* Set up a timer that periodically prints some information about the object system. */
Timer::Ptr profileTimer = boost::make_shared<Timer>();
profileTimer->OnTimerExpired.connect(boost::bind(&Application::ProfileTimerHandler));
flushTxTimer->SetInterval(15);
flushTxTimer->Start();
#endif /* _DEBUG */
}
GetEQ().Run();
}
/**
@ -186,12 +186,7 @@ void Application::TimeWatchThreadProc(void)
<< " in time: " << abs(timeDiff) << " seconds";
Logger::Write(LogInformation, "base", msgbuf.str());
/* in addition to rescheduling the timers this
* causes the event loop to wake up thereby
* solving the problem that timed_wait()
* uses an absolute timestamp for the timeout */
GetEQ().Post(boost::bind(&Timer::AdjustTimers,
-timeDiff));
Timer::AdjustTimers(-timeDiff);
}
lastLoop = now;
@ -302,25 +297,6 @@ bool Application::IsDebugging(void)
return m_Debugging;
}
/**
* Checks whether we're currently on the main thread.
*
* @returns true if this is the main thread, false otherwise
*/
bool Application::IsMainThread(void)
{
return (boost::this_thread::get_id() == m_MainThreadID);
}
/**
* Sets the main thread to the currently running thread.
*/
void Application::SetMainThread(void)
{
m_MainThreadID = boost::this_thread::get_id();
m_EQ.SetOwner(m_MainThreadID);
}
/**
* Displays a message that tells users what to do when they encounter a bug.
*/
@ -455,11 +431,11 @@ int Application::Run(void)
SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE);
#endif /* _WIN32 */
DynamicObject::BeginTx();
DynamicObject::NewTx();
result = Main();
DynamicObject::FinishTx();
DynamicObject::NewTx();
DynamicObject::DeactivateObjects();
return result;
@ -594,11 +570,11 @@ void Application::SetPkgDataDir(const String& path)
}
/**
* Returns the global mutex for the main thread.
* Returns the global mutex.
*
* @returns The mutex.
*/
boost::mutex& Application::GetMutex(void)
recursive_mutex& Application::GetMutex(void)
{
return m_Mutex;
}

@ -62,9 +62,6 @@ public:
static void SetDebugging(bool debug);
static bool IsDebugging(void);
static bool IsMainThread(void);
static void SetMainThread(void);
void UpdatePidFile(const String& filename);
void ClosePidFile(void);
@ -82,9 +79,7 @@ public:
static String GetPkgDataDir(void);
static void SetPkgDataDir(const String& path);
static bool ProcessEvents(void);
static boost::mutex& GetMutex(void);
static recursive_mutex& GetMutex(void);
static EventQueue& GetEQ(void);
@ -92,7 +87,7 @@ protected:
void RunEventLoop(void) const;
private:
static boost::mutex m_Mutex; /**< The main thread mutex. */
static recursive_mutex m_Mutex; /**< The global mutex. */
static Application *m_Instance; /**< The application instance. */
static bool m_ShuttingDown; /**< Whether the application is in the process of
@ -120,6 +115,11 @@ private:
static void ExceptionHandler(void);
static void TimeWatchThreadProc(void);
static void NewTxTimerHandler(void);
#ifdef _DEBUG
static void ProfileTimerHandler(void)
#endif /* _DEBUG */
static void ShutdownTimerHandler(void);
};
}

@ -18,7 +18,7 @@
******************************************************************************/
#ifndef ASYNCTASK_H
#define ASYNCTASK_H
#define ASYNCTASK_H
namespace icinga
{
@ -79,6 +79,7 @@ public:
*/
bool IsFinished(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_Finished;
}
@ -133,7 +134,9 @@ public:
*/
void Wait(void)
{
Utility::WaitUntil(boost::bind(&AsyncTask<TClass, TResult>::IsFinished, this));
boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Finished)
m_CV.wait(lock);
}
protected:
@ -151,9 +154,14 @@ private:
*/
void FinishInternal(void)
{
assert(!m_Finished);
{
boost::mutex::scoped_lock lock(m_Mutex);
assert(!m_Finished);
m_Finished = true;
m_Finished = true;
m_CV.notify_all();
}
if (!m_CompletionCallback.empty()) {
m_CompletionCallback(GetSelf());
@ -164,6 +172,8 @@ private:
}
}
mutable boost::mutex m_Mutex;
boost::condition_variable m_CV;
CompletionCallback m_CompletionCallback; /**< The completion callback. */
TResult m_Result; /**< The task's result. */
boost::exception_ptr m_Exception; /**< The task's exception. */

@ -29,8 +29,6 @@ REGISTER_TYPE(Component, NULL);
Component::Component(const Dictionary::Ptr& properties)
: DynamicObject(properties)
{
assert(Application::IsMainThread());
if (!IsLocal())
BOOST_THROW_EXCEPTION(runtime_error("Component objects must be local."));

@ -35,7 +35,7 @@ public:
void Close(void);
boost::signal<void (const Connection::Ptr&)> OnClosed;
signals2::signal<void (const Connection::Ptr&)> OnClosed;
protected:
virtual void ProcessData(void) = 0;

@ -23,10 +23,11 @@ using namespace icinga;
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject *> DynamicObject::m_ModifiedObjects;
boost::mutex DynamicObject::m_ModifiedObjectsMutex;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
boost::signal<void (const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
signals2::signal<void (double, const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
: m_ConfigTx(0)
@ -47,8 +48,12 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
ApplyUpdate(serializedObject, Attribute_Config);
}
/*
* @threadsafety Always.
*/
DynamicObject::~DynamicObject(void)
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
m_ModifiedObjects.erase(this);
}
@ -193,7 +198,10 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
if (tt.first->second.Type & Attribute_Config)
m_ConfigTx = tx;
m_ModifiedObjects.insert(this);
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
m_ModifiedObjects.insert(this);
}
/* Use insert() rather than [] so we don't overwrite
* an existing oldValue if the attribute was previously
@ -272,7 +280,7 @@ String DynamicObject::GetSource(void) const
void DynamicObject::Register(void)
{
assert(Application::IsMainThread());
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
@ -294,7 +302,7 @@ void DynamicObject::Start(void)
void DynamicObject::Unregister(void)
{
assert(Application::IsMainThread());
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
@ -331,8 +339,13 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
return task;
}
/*
* @threadsafety Always.
*/
void DynamicObject::DumpObjects(const String& filename)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
String tempFilename = filename + ".tmp";
@ -391,8 +404,13 @@ void DynamicObject::DumpObjects(const String& filename)
BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
}
/*
* @threadsafety Always.
*/
void DynamicObject::RestoreObjects(const String& filename)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
std::fstream fp;
@ -437,8 +455,13 @@ void DynamicObject::RestoreObjects(const String& filename)
Logger::Write(LogDebug, "base", msgbuf.str());
}
/*
* @threadsafety Always.
*/
void DynamicObject::DeactivateObjects(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::TypeMap::iterator tt;
for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
DynamicType::NameMap::iterator nt;
@ -451,34 +474,42 @@ void DynamicObject::DeactivateObjects(void)
}
}
/*
* @threadsafety Always.
*/
double DynamicObject::GetCurrentTx(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
assert(m_CurrentTx != 0);
return m_CurrentTx;
}
void DynamicObject::BeginTx(void)
/*
* @threadsafety Always.
*/
void DynamicObject::NewTx(void)
{
m_CurrentTx = Utility::GetTime();
}
set<DynamicObject *> objects;
void DynamicObject::FinishTx(void)
{
BOOST_FOREACH(DynamicObject *object, m_ModifiedObjects) {
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
/* Some objects may accidentally bleed into the next transaction because
* we're not holding the global mutex while "stealing" the modified objects,
* but that's entirely ok. */
m_ModifiedObjects.swap(objects);
}
recursive_mutex::scoped_lock lock(Application::GetMutex());
BOOST_FOREACH(DynamicObject *object, objects) {
object->SendLocalUpdateEvents();
}
OnTransactionClosing(m_ModifiedObjects);
m_ModifiedObjects.clear();
m_CurrentTx = 0;
}
void DynamicObject::FlushTx(void)
{
FinishTx();
BeginTx();
OnTransactionClosing(m_CurrentTx, objects);
m_CurrentTx = Utility::GetTime();
}
void DynamicObject::OnInitCompleted(void)
@ -487,8 +518,13 @@ void DynamicObject::OnInitCompleted(void)
void DynamicObject::OnAttributeChanged(const String&, const Value&)
{ }
/*
* @threadsafety Always.
*/
DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = DynamicType::GetByName(type);
return dtype->GetObject(name);
}

@ -94,9 +94,9 @@ public:
void ClearAttributesByType(DynamicAttributeType type);
static boost::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static boost::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static boost::signal<void (const set<DynamicObject *>&)> OnTransactionClosing;
static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static signals2::signal<void (double, const set<DynamicObject *>&)> OnTransactionClosing;
ScriptTask::Ptr InvokeMethod(const String& method,
const vector<Value>& arguments, ScriptTask::CompletionCallback callback);
@ -127,9 +127,7 @@ public:
static void DeactivateObjects(void);
static double GetCurrentTx(void);
static void BeginTx(void);
static void FinishTx(void);
static void FlushTx(void);
static void NewTx(void);
protected:
virtual void OnInitCompleted(void);
@ -149,6 +147,7 @@ private:
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject *> m_ModifiedObjects;
static boost::mutex m_ModifiedObjectsMutex;
friend class DynamicType; /* for OnInitCompleted. */
};

@ -21,12 +21,19 @@
using namespace icinga;
boost::mutex DynamicType::m_Mutex;
DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
: m_Name(name), m_ObjectFactory(factory)
{ }
/**
* @threadsafety Always.
*/
DynamicType::Ptr DynamicType::GetByName(const String& name)
{
boost::mutex::scoped_lock lock(m_Mutex);
DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
if (tt == GetTypes().end())
@ -35,12 +42,18 @@ DynamicType::Ptr DynamicType::GetByName(const String& name)
return tt->second;
}
/**
* @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
*/
DynamicType::TypeMap& DynamicType::GetTypes(void)
{
static DynamicType::TypeMap types;
return types;
}
/**
* @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
*/
DynamicType::NameMap& DynamicType::GetObjects(void)
{
return m_Objects;
@ -71,9 +84,16 @@ DynamicObject::Ptr DynamicType::GetObject(const String& name) const
return nt->second;
}
/**
* @threadsafety Always.
*/
void DynamicType::RegisterType(const DynamicType::Ptr& type)
{
if (GetByName(type->GetName()))
boost::mutex::scoped_lock lock(m_Mutex);
DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
if (tt != GetTypes().end())
BOOST_THROW_EXCEPTION(runtime_error("Cannot register class for type '" +
type->GetName() + "': Objects of this type already exist."));
@ -99,6 +119,9 @@ DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUp
return obj;
}
/**
* @threadsafety Always.
*/
bool DynamicType::TypeExists(const String& name)
{
return (GetByName(name));

@ -47,15 +47,15 @@ public:
static void RegisterType(const DynamicType::Ptr& type);
static bool TypeExists(const String& name);
DynamicObject::Ptr CreateObject(const Dictionary::Ptr& serializedUpdate) const;
DynamicObject::Ptr GetObject(const String& name) const;
void RegisterObject(const DynamicObject::Ptr& object);
void UnregisterObject(const DynamicObject::Ptr& object);
static TypeMap& GetTypes(void);
NameMap& GetObjects(void);
/* TODO(thread) make private */ static TypeMap& GetTypes(void);
/* TODO(thread) make private */ NameMap& GetObjects(void);
void AddAttribute(const String& name, DynamicAttributeType type);
void RemoveAttribute(const String& name);
@ -64,6 +64,7 @@ public:
void AddAttributes(const AttributeDescription *attributes, int attributeCount);
private:
static boost::mutex m_Mutex;
String m_Name;
ObjectFactory m_ObjectFactory;
map<String, DynamicAttributeType> m_Attributes;

@ -21,91 +21,95 @@
using namespace icinga;
/**
* @threadsafety Always.
*/
EventQueue::EventQueue(void)
: m_Stopped(false)
{ }
boost::thread::id EventQueue::GetOwner(void) const
/**
* @threadsafety Always.
*/
EventQueue::~EventQueue(void)
{
return m_Owner;
}
void EventQueue::SetOwner(boost::thread::id owner)
{
m_Owner = owner;
Stop();
}
/**
* @threadsafety Always.
*/
void EventQueue::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
m_EventAvailable.notify_all();
m_CV.notify_all();
}
/**
* Waits for events using the specified timeout value and processes
* them.
* Spawns worker threads and waits for them to complete.
*
* @param mtx The mutex that should be unlocked while waiting. Caller
* must have this mutex locked.
* @param timeout The wait timeout.
* @returns false if the queue has been stopped, true otherwise.
* @threadsafety Always.
*/
bool EventQueue::ProcessEvents(boost::mutex& mtx, millisec timeout)
void EventQueue::Run(void)
{
vector<Callback> events;
thread_group threads;
mtx.unlock();
int cpus = thread::hardware_concurrency();
{
boost::mutex::scoped_lock lock(m_Mutex);
if (cpus == 0)
cpus = 4;
while (m_Events.empty() && !m_Stopped) {
if (!m_EventAvailable.timed_wait(lock, timeout)) {
mtx.lock();
for (int i = 0; i < cpus * 4; i++)
threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
return !m_Stopped;
threads.join_all();
}
/**
* Waits for events and processes them.
*
* @threadsafety Always.
*/
void EventQueue::QueueThreadProc(void)
{
while (!m_Stopped) {
vector<Callback> events;
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_Events.empty() && !m_Stopped)
m_CV.wait(lock);
events.swap(m_Events);
}
BOOST_FOREACH(const Callback& ev, events) {
double st = Utility::GetTime();
ev();
double et = Utility::GetTime();
if (et - st > 1.0) {
stringstream msgbuf;
msgbuf << "Event call took " << et - st << " seconds.";
Logger::Write(LogWarning, "base", msgbuf.str());
}
}
events.swap(m_Events);
}
mtx.lock();
BOOST_FOREACH(const Callback& ev, events) {
double st = Utility::GetTime();
ev();
double et = Utility::GetTime();
if (et - st > 1.0) {
stringstream msgbuf;
msgbuf << "Event call took " << et - st << " seconds.";
Logger::Write(LogWarning, "base", msgbuf.str());
}
}
return !m_Stopped;
}
/**
* Appends an event to the event queue. Events will be processed in FIFO
* order on the main thread.
* Appends an event to the event queue. Events will be processed in FIFO order.
*
* @param callback The callback function for the event.
* @threadsafety Always.
*/
void EventQueue::Post(const EventQueue::Callback& callback)
{
if (boost::this_thread::get_id() == m_Owner) {
callback();
return;
}
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(callback);
m_EventAvailable.notify_all();
}
boost::mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(callback);
m_CV.notify_all();
}

@ -34,24 +34,23 @@ public:
typedef function<void ()> Callback;
EventQueue(void);
~EventQueue(void);
bool ProcessEvents(boost::mutex& mtx, millisec timeout = boost::posix_time::milliseconds(30000));
void Run(void);
void Post(const Callback& callback);
void Stop(void);
boost::thread::id GetOwner(void) const;
void SetOwner(boost::thread::id owner);
boost::mutex& GetMutex(void);
private:
boost::thread::id m_Owner;
boost::mutex m_Mutex;
condition_variable m_CV;
bool m_Stopped;
vector<Callback> m_Events;
condition_variable m_EventAvailable;
void QueueThreadProc(void);
};
}

@ -125,7 +125,7 @@ using std::type_info;
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/signal.hpp>
#include <boost/signals2.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/compare.hpp>
@ -139,6 +139,9 @@ using std::type_info;
#include <boost/uuid/uuid_io.hpp>
#include <boost/program_options.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
using boost::shared_ptr;
using boost::weak_ptr;
@ -148,6 +151,7 @@ using boost::static_pointer_cast;
using boost::function;
using boost::thread;
using boost::thread_group;
using boost::recursive_mutex;
using boost::condition_variable;
using boost::system_time;
using boost::posix_time::millisec;
@ -155,11 +159,18 @@ using boost::tie;
using boost::rethrow_exception;
using boost::current_exception;
using boost::diagnostic_information;
using boost::multi_index_container;
using boost::multi_index::indexed_by;
using boost::multi_index::identity;
using boost::multi_index::ordered_unique;
using boost::multi_index::ordered_non_unique;
using boost::multi_index::nth_index;
namespace tuples = boost::tuples;
namespace signals2 = boost::signals2;
#if defined(__APPLE__) && defined(__MACH__)
# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <openssl/bio.h>

@ -81,7 +81,10 @@ void Logger::Write(LogSeverity severity, const String& facility,
entry.Facility = facility;
entry.Message = message;
Application::GetEQ().Post(boost::bind(&Logger::ForwardLogEntry, entry));
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ForwardLogEntry(entry);
}
}
/**
@ -182,4 +185,3 @@ DynamicObject::Ptr ILogger::GetConfig(void) const
{
return m_Config->GetSelf();
}

@ -29,6 +29,7 @@ using namespace icinga;
* @returns true if a complete String was read from the IOQueue, false otherwise.
* @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netString-c/blob/master/netString.c
* @threadsafety Always.
*/
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
{
@ -110,6 +111,7 @@ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
*
* @param stream The stream.
* @param str The String that is to be written.
* @threadsafety Always.
*/
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
{

@ -25,43 +25,13 @@ using namespace icinga;
* Default constructor for the Object class.
*/
Object::Object(void)
{
#ifdef _DEBUG
boost::mutex::scoped_lock lock(*GetMutex());
GetAliveObjects()->insert(this);
#endif /* _DEBUG */
}
{ }
/**
* Destructor for the Object class.
*/
Object::~Object(void)
{
#ifdef _DEBUG
boost::mutex::scoped_lock lock(*GetMutex());
GetAliveObjects()->erase(this);
#endif /* _DEBUG */
}
/**
* Temporarily holds onto a reference for an object. This can
* be used to safely clear the last reference to an object
* in an event handler.
*/
void Object::Hold(void)
{
boost::mutex::scoped_lock lock(*GetMutex());
GetHeldObjects().push_back(GetSelf());
}
/**
* Clears all temporarily held objects.
*/
void Object::ClearHeldObjects(void)
{
boost::mutex::scoped_lock lock(*GetMutex());
GetHeldObjects().clear();
}
{ }
/**
* Returns a reference-counted pointer to this object.
@ -73,91 +43,14 @@ Object::SharedPtrHolder Object::GetSelf(void)
return Object::SharedPtrHolder(shared_from_this());
}
#ifdef _DEBUG
/**
* Retrieves the number of currently alive objects.
* Returns the mutex that must be held while calling non-static methods
* which have not been explicitly marked as thread-safe.
*
* @returns The number of alive objects.
* @returns The object's mutex.
* @threadsafety Always.
*/
int Object::GetAliveObjectsCount(void)
recursive_mutex& Object::GetMutex(void)
{
boost::mutex::scoped_lock lock(*GetMutex());
return GetAliveObjects()->size();
return m_Mutex;
}
/**
* Dumps a memory histogram to the "dictionaries.dump" file.
*/
void Object::PrintMemoryProfile(void)
{
map<String, int> types;
ofstream dictfp("dictionaries.dump.tmp");
{
boost::mutex::scoped_lock lock(*GetMutex());
set<Object *>::iterator it;
BOOST_FOREACH(Object *obj, *GetAliveObjects()) {
pair<map<String, int>::iterator, bool> tt;
tt = types.insert(make_pair(Utility::GetTypeName(typeid(*obj)), 1));
if (!tt.second)
tt.first->second++;
if (typeid(*obj) == typeid(Dictionary)) {
Dictionary::Ptr dict = obj->GetSelf();
dictfp << Value(dict).Serialize() << std::endl;
}
}
}
#ifdef _WIN32
_unlink("dictionaries.dump");
#endif /* _WIN32 */
dictfp.close();
if (rename("dictionaries.dump.tmp", "dictionaries.dump") < 0)
BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
String type;
int count;
BOOST_FOREACH(tie(type, count), types) {
std::cerr << type << ": " << count << std::endl;
}
}
/**
* Returns currently active objects.
*
* @returns currently active objects
*/
set<Object *> *Object::GetAliveObjects(void)
{
static set<Object *> *aliveObjects = new set<Object *>();
return aliveObjects;
}
#endif /* _DEBUG */
/**
* Returns the mutex used for accessing static members.
*
* @returns a mutex
*/
boost::mutex *Object::GetMutex(void)
{
static boost::mutex *mutex = new boost::mutex();
return mutex;
}
/**
* Returns currently held objects. The caller must be
* holding the mutex returned by GetMutex().
*
* @returns currently held objects
*/
vector<Object::Ptr>& Object::GetHeldObjects(void)
{
static vector<Object::Ptr> heldObjects;
return heldObjects;
}

@ -31,15 +31,12 @@ class SharedPtrHolder;
*
* @ingroup base
*/
class I2_BASE_API Object : public enable_shared_from_this<Object>, public boost::signals::trackable
class I2_BASE_API Object : public enable_shared_from_this<Object>
{
public:
typedef shared_ptr<Object> Ptr;
typedef weak_ptr<Object> WeakPtr;
void Hold(void);
static void ClearHeldObjects(void);
/**
* Holds a shared pointer and provides support for implicit upcasts.
*
@ -96,10 +93,7 @@ public:
SharedPtrHolder GetSelf(void);
#ifdef _DEBUG
static int GetAliveObjectsCount(void);
static void PrintMemoryProfile(void);
#endif /* _DEBUG */
recursive_mutex& GetMutex(void);
protected:
Object(void);
@ -109,9 +103,24 @@ private:
Object(const Object& other);
Object& operator=(const Object& rhs);
static boost::mutex *GetMutex(void);
static set<Object *> *GetAliveObjects(void);
static vector<Object::Ptr>& GetHeldObjects(void);
recursive_mutex m_Mutex;
};
/**
* A scoped lock for Objects.
*/
struct ObjectLock {
public:
ObjectLock(const Object::Ptr& object)
: m_Lock(object->GetMutex())
{ }
ObjectLock(Object *object)
: m_Lock(object->GetMutex())
{ }
private:
recursive_mutex::scoped_lock m_Lock;
};
/**

@ -26,7 +26,7 @@ using namespace icinga;
int Process::m_TaskFd;
extern char **environ;
void Process::CreateWorkers(void)
void Process::Initialize(void)
{
int fds[2];

@ -22,7 +22,7 @@
using namespace icinga;
void Process::CreateWorkers(void)
void Process::Initialize(void)
{
// TODO: implement
}

@ -21,20 +21,14 @@
using namespace icinga;
bool Process::m_WorkersCreated = false;
boost::once_flag Process::m_ThreadOnce;
boost::mutex Process::m_Mutex;
deque<Process::Ptr> Process::m_Tasks;
Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
: AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
{
assert(Application::IsMainThread());
if (!m_WorkersCreated) {
CreateWorkers();
m_WorkersCreated = true;
}
boost::call_once(&Process::Initialize, m_ThreadOnce);
#ifndef _WIN32
m_FD = -1;

@ -54,8 +54,6 @@ public:
static vector<String> SplitCommand(const Value& command);
private:
static bool m_WorkersCreated;
vector<String> m_Arguments;
Dictionary::Ptr m_ExtraEnvironment;
@ -76,7 +74,6 @@ private:
static int m_TaskFd;
#endif /* _WIN32 */
static void CreateWorkers(void);
static void NotifyWorker(void);
void SpawnTask(void);
@ -89,6 +86,9 @@ private:
void InitTask(void);
bool RunTask(void);
static boost::once_flag m_ThreadOnce;
static void Initialize(void);
};
}

@ -32,12 +32,6 @@ Script::Script(const Dictionary::Ptr& properties)
: DynamicObject(properties)
{ }
Script::~Script(void)
{
if (m_Interpreter)
m_Interpreter->Stop();
}
void Script::OnInitCompleted(void)
{
SpawnInterpreter();
@ -63,10 +57,6 @@ void Script::SpawnInterpreter(void)
{
Logger::Write(LogInformation, "base", "Reloading script '" + GetName() + "'");
if (m_Interpreter)
m_Interpreter->Stop();
ScriptLanguage::Ptr language = ScriptLanguage::GetByName(GetLanguage());
m_Interpreter = language->CreateInterpreter(GetSelf());
m_Interpreter->Start();
}

@ -37,7 +37,6 @@ public:
typedef weak_ptr<Script> WeakPtr;
Script(const Dictionary::Ptr& properties);
~Script(void);
String GetLanguage(void) const;
String GetCode(void) const;

@ -21,8 +21,8 @@
using namespace icinga;
boost::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
boost::signal<void (const String&)> ScriptFunction::OnUnregistered;
signals2::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
signals2::signal<void (const String&)> ScriptFunction::OnUnregistered;
ScriptFunction::ScriptFunction(const Callback& function)
: m_Callback(function)
@ -31,13 +31,13 @@ ScriptFunction::ScriptFunction(const Callback& function)
void ScriptFunction::Register(const String& name, const ScriptFunction::Ptr& function)
{
GetFunctions()[name] = function;
Application::GetEQ().Post(boost::bind(boost::ref(OnRegistered), name, function));
OnRegistered(name, function);
}
void ScriptFunction::Unregister(const String& name)
{
GetFunctions().erase(name);
Application::GetEQ().Post(boost::bind(boost::ref(OnUnregistered), name));
OnUnregistered(name);
}
ScriptFunction::Ptr ScriptFunction::GetByName(const String& name)

@ -46,10 +46,10 @@ public:
void Invoke(const shared_ptr<ScriptTask>& task, const vector<Value>& arguments);
static map<String, ScriptFunction::Ptr>& GetFunctions(void);
/* TODO(thread) make private */ static map<String, ScriptFunction::Ptr>& GetFunctions(void);
static boost::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
static boost::signal<void (const String&)> OnUnregistered;
static signals2::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
static signals2::signal<void (const String&)> OnUnregistered;
private:
Callback m_Callback;

@ -26,55 +26,16 @@ ScriptInterpreter::ScriptInterpreter(const Script::Ptr& script)
ScriptInterpreter::~ScriptInterpreter(void)
{
Stop();
}
void ScriptInterpreter::Start(void)
{
/* We can't start the thread in the constructor because
* the worker thread might end up calling one of the virtual
* methods before the object is fully constructed. */
m_Thread = boost::thread(&ScriptInterpreter::ThreadWorkerProc, this);
}
void ScriptInterpreter::Stop(void)
{
assert(Application::IsMainThread());
m_EQ.Stop();
BOOST_FOREACH(const String& function, m_SubscribedFunctions) {
ScriptFunction::Unregister(function);
}
m_Thread.join();
}
void ScriptInterpreter::ThreadWorkerProc(void)
{
m_EQ.SetOwner(boost::this_thread::get_id());
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_EQ.ProcessEvents(m_Mutex))
; /* empty loop */
}
}
void ScriptInterpreter::ScriptFunctionThunk(const ScriptTask::Ptr& task,
const String& function, const vector<Value>& arguments)
{
m_EQ.Post(boost::bind(&ScriptInterpreter::ProcessCall, this,
task, function, arguments));
}
void ScriptInterpreter::SubscribeFunction(const String& name)
{
m_SubscribedFunctions.insert(name);
ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ScriptFunctionThunk, this, _1, name, _2));
ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2));
ScriptFunction::Register(name, sf);
}

@ -36,9 +36,6 @@ public:
~ScriptInterpreter(void);
void Start(void);
void Stop(void);
protected:
ScriptInterpreter(const Script::Ptr& script);
@ -49,15 +46,7 @@ protected:
void UnsubscribeFunction(const String& name);
private:
boost::mutex m_Mutex;
EventQueue m_EQ;
set<String> m_SubscribedFunctions;
boost::thread m_Thread;
void ThreadWorkerProc(void);
void ScriptFunctionThunk(const ScriptTask::Ptr& task, const String& function,
const vector<Value>& arguments);
};
}

@ -24,18 +24,33 @@ using namespace icinga;
ScriptLanguage::ScriptLanguage(void)
{ }
/**
* @threadsafety Always.
*/
void ScriptLanguage::Register(const String& name, const ScriptLanguage::Ptr& language)
{
boost::mutex::scoped_lock lock(GetMutex());
GetLanguages()[name] = language;
}
/**
* @threadsafety Always.
*/
void ScriptLanguage::Unregister(const String& name)
{
boost::mutex::scoped_lock lock(GetMutex());
GetLanguages().erase(name);
}
/**
* @threadsafety Always.
*/
ScriptLanguage::Ptr ScriptLanguage::GetByName(const String& name)
{
boost::mutex::scoped_lock lock(GetMutex());
map<String, ScriptLanguage::Ptr>::iterator it;
it = GetLanguages().find(name);
@ -46,6 +61,12 @@ ScriptLanguage::Ptr ScriptLanguage::GetByName(const String& name)
return it->second;
}
boost::mutex& ScriptLanguage::GetMutex(void)
{
static boost::mutex mutex;
return mutex;
}
map<String, ScriptLanguage::Ptr>& ScriptLanguage::GetLanguages(void)
{
static map<String, ScriptLanguage::Ptr> languages;

@ -47,6 +47,7 @@ protected:
ScriptLanguage(void);
private:
static boost::mutex& GetMutex(void);
static map<String, ScriptLanguage::Ptr>& GetLanguages(void);
};

@ -532,7 +532,7 @@ void Socket::HandleReadableClient(void)
}
if (new_data)
Application::GetEQ().Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
OnDataAvailable(GetSelf());
}
void Socket::HandleWritableServer(void)
@ -557,7 +557,7 @@ void Socket::HandleReadableServer(void)
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->SetFD(fd);
Application::GetEQ().Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
OnNewClient(GetSelf(), client);
}
/**

@ -51,7 +51,7 @@ public:
void Listen(void);
boost::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
protected:
Socket(void);

@ -40,9 +40,9 @@ void Stream::SetConnected(bool connected)
m_Connected = connected;
if (m_Connected)
Application::GetEQ().Post(boost::bind(boost::ref(OnConnected), GetSelf()));
OnConnected(GetSelf());
else
Application::GetEQ().Post(boost::bind(boost::ref(OnClosed), GetSelf()));
OnClosed(GetSelf());
}
/**

@ -85,9 +85,9 @@ public:
boost::exception_ptr GetException(void);
void CheckException(void);
boost::signal<void (const Stream::Ptr&)> OnConnected;
boost::signal<void (const Stream::Ptr&)> OnDataAvailable;
boost::signal<void (const Stream::Ptr&)> OnClosed;
signals2::signal<void (const Stream::Ptr&)> OnConnected;
signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
signals2::signal<void (const Stream::Ptr&)> OnClosed;
protected:
void SetConnected(bool connected);

@ -21,76 +21,52 @@
using namespace icinga;
Timer::CollectionType Timer::m_Timers;
Timer::TimerSet Timer::m_Timers;
boost::mutex Timer::m_Mutex;
boost::condition_variable Timer::m_CV;
boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT;
/**
* Constructor for the Timer class.
*/
Timer::Timer(void)
: m_Interval(0)
{ }
/**
* Calls expired timers and returned when the next wake-up should happen.
* Extracts the next timestamp from a Timer.
*
* @returns Time when the next timer is due.
* @param wtimer Weak pointer to the timer.
* @returns The next timestamp
* @threadsafety Caller must hold Timer::m_Mutex.
*/
double Timer::ProcessTimers(void)
double TimerNextExtractor::operator()(const Timer::WeakPtr& wtimer)
{
double wakeup = 30; /* wake up at least once after this many seconds */
Timer::Ptr timer = wtimer.lock();
double st = Utility::GetTime();
if (!timer)
return 0;
Timer::CollectionType::iterator prev, i;
for (i = m_Timers.begin(); i != m_Timers.end(); ) {
Timer::Ptr timer = i->lock();
prev = i;
i++;
if (!timer) {
m_Timers.erase(prev);
continue;
}
double now = Utility::GetTime();
if (timer->m_Next <= now) {
timer->Call();
/* time may have changed depending on how long the
* timer call took - we need to fetch the current time */
now = Utility::GetTime();
double next = now + timer->GetInterval();
if (timer->m_Next <= now || next < timer->m_Next)
timer->Reschedule(next);
}
assert(timer->m_Next > now);
if (timer->m_Next - now < wakeup)
wakeup = timer->m_Next - now;
}
assert(wakeup > 0);
double et = Utility::GetTime();
if (et - st > 0.01) {
stringstream msgbuf;
msgbuf << "Timers took " << et - st << " seconds";
Logger::Write(LogDebug, "base", msgbuf.str());
}
return wakeup;
return timer->m_Next;
}
/**
* Calls this timer. Note: the timer delegate must not call
* Disable() on any other timers than the timer that originally
* invoked the delegate.
* Constructor for the Timer class.
*
* @threadsafety Always.
*/
Timer::Timer(void)
: m_Interval(0), m_Next(0)
{ }
/**
* Initializes the timer sub-system.
*
* @threadsafety Always.
*/
void Timer::Initialize(void)
{
thread worker(boost::bind(&Timer::TimerThreadProc));
worker.detach();
}
/**
* Calls this timer.
*
* @threadsafety Always.
*/
void Timer::Call(void)
{
@ -105,15 +81,19 @@ void Timer::Call(void)
msgbuf << "Timer call took " << et - st << " seconds.";
Logger::Write(LogWarning, "base", msgbuf.str());
}
Reschedule();
}
/**
* Sets the interval for this timer.
*
* @param interval The new interval.
* @threadsafety Always.
*/
void Timer::SetInterval(double interval)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Interval = interval;
}
@ -121,44 +101,81 @@ void Timer::SetInterval(double interval)
* Retrieves the interval for this timer.
*
* @returns The interval.
* @threadsafety Always.
*/
double Timer::GetInterval(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_Interval;
}
/**
* Registers the timer and starts processing events for it.
*
* @threadsafety Always.
*/
void Timer::Start(void)
{
assert(Application::IsMainThread());
boost::call_once(&Timer::Initialize, m_ThreadOnce);
Stop();
m_Timers.push_back(GetSelf());
Reschedule(Utility::GetTime() + m_Interval);
Reschedule();
}
/**
* Unregisters the timer and stops processing events for it.
*
* @threadsafety Always.
*/
void Timer::Stop(void)
{
assert(Application::IsMainThread());
boost::mutex::scoped_lock lock(m_Mutex);
m_Timers.erase(GetSelf());
m_Timers.remove_if(WeakPtrEqual<Timer>(this));
/* Notify the worker thread that we've disabled a timer. */
m_CV.notify_all();
}
/**
* Reschedules this timer.
*
* @param next The time when this timer should be called again.
* @param next The time when this timer should be called again. Use -1 to let
* the timer figure out a suitable time based on the interval.
* @threadsafety Always.
*/
void Timer::Reschedule(double next)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (next < 0) {
double now = Utility::GetTime();
next = m_Next + m_Interval;
if (next < now)
next = now + m_Interval;
else
next = next;
}
m_Next = next;
/* Remove and re-add the timer to update the index. */
m_Timers.erase(GetSelf());
m_Timers.insert(GetSelf());
/* Notify the worker that we've rescheduled a timer. */
m_CV.notify_all();
}
/**
* Retrieves when the timer is next due.
*
* @returns The timestamp.
* @threadsafety Always.
*/
double Timer::GetNext(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_Next;
}
/**
@ -166,17 +183,76 @@ void Timer::Reschedule(double next)
* next scheduled timestamp.
*
* @param adjustment The adjustment.
* @threadsafety Always.
*/
void Timer::AdjustTimers(double adjustment)
{
boost::mutex::scoped_lock lock(m_Mutex);
double now = Utility::GetTime();
Timer::CollectionType::iterator i;
for (i = m_Timers.begin(); i != m_Timers.end(); i++) {
Timer::Ptr timer = i->lock();
typedef nth_index<TimerSet, 1>::type TimerView;
TimerView& idx = boost::get<1>(m_Timers);
TimerView::iterator it;
for (it = idx.begin(); it != idx.end(); it++) {
Timer::Ptr timer = it->lock();
if (abs(now - (timer->m_Next + adjustment)) <
abs(now - timer->m_Next))
abs(now - timer->m_Next)) {
timer->m_Next += adjustment;
m_Timers.erase(timer);
m_Timers.insert(timer);
}
}
/* Notify the worker that we've rescheduled some timers. */
m_CV.notify_all();
}
/**
* Worker thread proc for Timer objects.
*
* @threadsafety Always.
*/
void Timer::TimerThreadProc(void)
{
for (;;) {
boost::mutex::scoped_lock lock(m_Mutex);
typedef nth_index<TimerSet, 1>::type NextTimerView;
NextTimerView& idx = boost::get<1>(m_Timers);
/* Wait until there is at least one timer. */
while (idx.empty())
m_CV.wait(lock);
NextTimerView::iterator it = idx.begin();
Timer::Ptr timer = it->lock();
if (!timer) {
/* Remove the timer from the list if it's not alive anymore. */
idx.erase(it);
continue;
}
double wait = timer->m_Next - Utility::GetTime();
if (wait > 0) {
/* Make sure the timer we just examined can be destroyed while we're waiting. */
timer.reset();
/* Wait for the next timer. */
m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
continue;
}
/* Remove the timer from the list so it doesn't get called again
* until the current call is completed. */
m_Timers.erase(timer);
/* Asynchronously call the timer. */
Application::GetEQ().Post(boost::bind(&Timer::Call, timer));
}
}

@ -24,6 +24,18 @@
namespace icinga {
class Timer;
/**
* @ingroup base
*/
struct TimerNextExtractor
{
typedef double result_type;
double operator()(const weak_ptr<Timer>& wtimer);
};
/**
* A timer that periodically triggers an event.
*
@ -42,23 +54,40 @@ public:
void SetInterval(double interval);
double GetInterval(void) const;
static double ProcessTimers(void);
static void AdjustTimers(double adjustment);
void Start(void);
void Stop(void);
void Reschedule(double next);
void Reschedule(double next = -1);
double GetNext(void) const;
boost::signal<void(const Timer::Ptr&)> OnTimerExpired;
signals2::signal<void(const Timer::Ptr&)> OnTimerExpired;
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
static Timer::CollectionType m_Timers;
typedef multi_index_container<
Timer::WeakPtr,
indexed_by<
ordered_unique<identity<Timer::WeakPtr> >,
ordered_non_unique<TimerNextExtractor>
>
> TimerSet;
static boost::mutex m_Mutex;
static boost::condition_variable m_CV;
static TimerSet m_Timers;
void Call(void);
static boost::once_flag m_ThreadOnce;
static void Initialize(void);
static void TimerThreadProc(void);
friend struct TimerNextExtractor;
};
}

@ -398,17 +398,11 @@ pid_t Utility::GetPid(void)
*/
void Utility::Sleep(double timeout)
{
if (Application::IsMainThread())
Application::GetMutex().unlock();
#ifndef _WIN32
usleep(timeout * 1000 * 1000);
#else /* _WIN32 */
::Sleep(timeout * 1000);
#endif /* _WIN32 */
if (Application::IsMainThread())
Application::GetMutex().lock();
}
/**
@ -521,17 +515,6 @@ bool Utility::Glob(const String& pathSpec, const function<void (const String&)>&
#endif /* _WIN32 */
}
/**
* Waits until the given predicate is true. Executes events while waiting.
*
* @param predicate The predicate.
*/
void Utility::WaitUntil(const function<bool (void)>& predicate)
{
while (!predicate())
Application::ProcessEvents();
}
#ifndef _WIN32
void Utility::SetNonBlocking(int fd)
{

@ -58,8 +58,6 @@ public:
static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
static void WaitUntil(const function<bool (void)>& predicate);
static
#ifdef _WIN32
HMODULE

@ -21,9 +21,10 @@
using namespace icinga;
boost::mutex ConfigItem::m_Mutex;
ConfigItem::ItemMap ConfigItem::m_Items;
boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
/**
* Constructor for the ConfigItem class.
@ -295,29 +296,38 @@ DynamicObject::Ptr ConfigItem::GetDynamicObject(void) const
* @param type The type of the ConfigItem that is to be looked up.
* @param name The name of the ConfigItem that is to be looked up.
* @returns The configuration item.
* @threadsafety Always.
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
ConfigItem::ItemMap::iterator it;
{
recursive_mutex::scoped_lock lockg(Application::GetMutex());
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
if (context) {
ConfigItem::Ptr item = context->GetItem(type, name);
if (context) {
ConfigItem::Ptr item = context->GetItem(type, name);
if (item)
return item;
if (item)
return item;
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if ((context->GetFlags() & CompilerLinkExisting) == 0)
return ConfigItem::Ptr();
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if ((context->GetFlags() & CompilerLinkExisting) == 0)
return ConfigItem::Ptr();
}
}
it = m_Items.find(make_pair(type, name));
{
boost::mutex::scoped_lock lock(m_Mutex);
if (it != m_Items.end())
return it->second;
ConfigItem::ItemMap::iterator it;
it = m_Items.find(make_pair(type, name));
if (it != m_Items.end())
return it->second;
}
return ConfigItem::Ptr();
}
@ -351,8 +361,13 @@ void ConfigItem::Dump(ostream& fp) const
fp << "}" << "\n";
}
/**
* @threadsafety Caller must hold the global mutex.
*/
void ConfigItem::UnloadUnit(const String& unit)
{
boost::mutex::scoped_lock lock(m_Mutex);
Logger::Write(LogInformation, "config", "Unloading config items from compilation unit '" + unit + "'");
vector<ConfigItem::Ptr> obsoleteItems;

@ -62,8 +62,8 @@ public:
static void UnloadUnit(const String& unit);
static boost::signal<void (const ConfigItem::Ptr&)> OnCommitted;
static boost::signal<void (const ConfigItem::Ptr&)> OnRemoved;
static signals2::signal<void (const ConfigItem::Ptr&)> OnCommitted;
static signals2::signal<void (const ConfigItem::Ptr&)> OnRemoved;
private:
void InternalLink(const Dictionary::Ptr& dictionary) const;
@ -89,6 +89,8 @@ private:
set<ConfigItem::WeakPtr> m_ChildObjects; /**< Instantiated items
* that inherit from this item */
static boost::mutex m_Mutex;
typedef map<pair<String, String>, ConfigItem::Ptr> ItemMap;
static ItemMap m_Items; /**< All registered configuration items. */
};

@ -78,6 +78,9 @@ void ConfigType::ValidateItem(const ConfigItem::Ptr& item) const
ValidateDictionary(attrs, ruleLists, locations);
}
/**
* @threadsafety Always.
*/
String ConfigType::LocationToString(const vector<String>& locations)
{
bool first = true;
@ -94,6 +97,9 @@ String ConfigType::LocationToString(const vector<String>& locations)
return stack;
}
/**
* @threadsafety Always.
*/
void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
const vector<TypeRuleList::Ptr>& ruleLists, vector<String>& locations)
{
@ -168,4 +174,3 @@ void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
locations.pop_back();
}
}

@ -160,6 +160,9 @@ void Expression::DumpValue(ostream& fp, int indent, const Value& value, bool inl
BOOST_THROW_EXCEPTION(runtime_error("Encountered unknown type while dumping value."));
}
/**
* @threadsafety Always.
*/
void Expression::Dump(ostream& fp, int indent) const
{
if (m_Operator == OperatorExecute) {

@ -23,6 +23,9 @@ using namespace icinga;
REGISTER_SCRIPTFUNCTION("GetAnswerToEverything", &API::GetAnswerToEverything);
/**
* @threadsafety Always.
*/
void API::GetAnswerToEverything(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
if (arguments.size() < 1)

@ -21,25 +21,42 @@
using namespace icinga;
boost::mutex CIB::m_Mutex;
RingBuffer CIB::m_ActiveChecksStatistics(15 * 60);
RingBuffer CIB::m_PassiveChecksStatistics(15 * 60);
/**
* @threadsafety Always.
*/
void CIB::UpdateActiveChecksStatistics(long tv, int num)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_ActiveChecksStatistics.InsertValue(tv, num);
}
/**
* @threadsafety Always.
*/
int CIB::GetActiveChecksStatistics(long timespan)
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_ActiveChecksStatistics.GetValues(timespan);
}
/**
* @threadsafety Always.
*/
void CIB::UpdatePassiveChecksStatistics(long tv, int num)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_PassiveChecksStatistics.InsertValue(tv, num);
}
/**
* @threadsafety Always.
*/
int CIB::GetPassiveChecksStatistics(long timespan)
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_PassiveChecksStatistics.GetValues(timespan);
}

@ -39,6 +39,7 @@ public:
static int GetPassiveChecksStatistics(long timespan);
private:
static boost::mutex m_Mutex;
static RingBuffer m_ActiveChecksStatistics;
static RingBuffer m_PassiveChecksStatistics;
};

@ -21,9 +21,13 @@
using namespace icinga;
bool I2_EXPORT ExternalCommandProcessor::m_Initialized;
map<String, ExternalCommandProcessor::Callback> I2_EXPORT ExternalCommandProcessor::m_Commands;
boost::once_flag ExternalCommandProcessor::m_InitializeOnce;
boost::mutex ExternalCommandProcessor::m_Mutex;
map<String, ExternalCommandProcessor::Callback> ExternalCommandProcessor::m_Commands;
/**
* @threadsafety Always.
*/
void ExternalCommandProcessor::Execute(const String& line)
{
if (line.IsEmpty())
@ -54,68 +58,91 @@ void ExternalCommandProcessor::Execute(const String& line)
Execute(ts, argv[0], argvExtra);
}
/**
* @threadsafety Always.
*/
void ExternalCommandProcessor::Execute(double time, const String& command, const vector<String>& arguments)
{
if (!m_Initialized) {
RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
boost::call_once(m_InitializeOnce, &ExternalCommandProcessor::Initialize);
m_Initialized = true;
Callback callback;
{
boost::mutex::scoped_lock lock(m_Mutex);
map<String, ExternalCommandProcessor::Callback>::iterator it;
it = m_Commands.find(command);
if (it == m_Commands.end())
BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
callback = it->second;
}
map<String, ExternalCommandProcessor::Callback>::iterator it;
it = m_Commands.find(command);
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
callback(time, arguments);
}
if (it == m_Commands.end())
BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
it->second(time, arguments);
}
/**
* @threadsafety Always.
*/
void ExternalCommandProcessor::Initialize(void)
{
RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
}
/**
* @threadsafety Always.
*/
void ExternalCommandProcessor::RegisterCommand(const String& command, const ExternalCommandProcessor::Callback& callback)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Commands[command] = callback;
}

@ -28,6 +28,19 @@ public:
static void Execute(const String& line);
static void Execute(double time, const String& command, const vector<String>& arguments);
private:
typedef function<void (double time, const vector<String>& arguments)> Callback;
static boost::once_flag m_InitializeOnce;
static boost::mutex m_Mutex;
static map<String, Callback> m_Commands;
ExternalCommandProcessor(void);
static void Initialize(void);
static void RegisterCommand(const String& command, const Callback& callback);
static void ProcessServiceCheckResult(double time, const vector<String>& arguments);
static void ScheduleSvcCheck(double time, const vector<String>& arguments);
static void ScheduleForcedSvcCheck(double time, const vector<String>& arguments);
@ -72,16 +85,6 @@ public:
static void DelAllSvcComments(double time, const vector<String>& arguments);
static void SendCustomHostNotification(double time, const vector<String>& arguments);
static void SendCustomSvcNotification(double time, const vector<String>& arguments);
private:
typedef function<void (double time, const vector<String>& arguments)> Callback;
static bool m_Initialized;
static map<String, Callback> m_Commands;
ExternalCommandProcessor(void);
static void RegisterCommand(const String& command, const Callback& callback);
};
}

@ -59,11 +59,17 @@ String Host::GetDisplayName(void) const
return GetName();
}
/**
* @threadsafety Always.
*/
bool Host::Exists(const String& name)
{
return (DynamicObject::GetObject("Host", name));
}
/**
* @threadsafety Always.
*/
Host::Ptr Host::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("Host", name);

@ -50,11 +50,17 @@ String HostGroup::GetActionUrl(void) const
return Get("action_url");
}
/**
* @threadsafety Always.
*/
bool HostGroup::Exists(const String& name)
{
return (DynamicObject::GetObject("HostGroup", name));
}
/**
* @threadsafety Always.
*/
HostGroup::Ptr HostGroup::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("HostGroup", name);

@ -27,7 +27,7 @@ Value MacroProcessor::ResolveMacros(const Value& cmd, const Dictionary::Ptr& mac
if (cmd.IsScalar()) {
result = InternalResolveMacros(cmd, macros);
} else {
} else if (cmd.IsObjectType<Dictionary>()) {
Dictionary::Ptr resultDict = boost::make_shared<Dictionary>();
Dictionary::Ptr dict = cmd;
@ -37,6 +37,8 @@ Value MacroProcessor::ResolveMacros(const Value& cmd, const Dictionary::Ptr& mac
}
result = resultDict;
} else {
BOOST_THROW_EXCEPTION(invalid_argument("Command is not a string or dictionary."));
}
return result;

@ -27,9 +27,12 @@ PluginCheckTask::PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr
: m_Task(task), m_Process(process)
{ }
/**
* @threadsafety Always.
*/
void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
assert(Application::IsMainThread());
recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
@ -57,8 +60,13 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
}
/**
* @threadsafety Always.
*/
void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessResult pr;
try {

@ -28,9 +28,12 @@ PluginNotificationTask::PluginNotificationTask(const ScriptTask::Ptr& task, cons
: m_Task(task), m_Process(process), m_ServiceName(service), m_Command(command)
{ }
/**
* @threadsafety Always.
*/
void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
assert(Application::IsMainThread());
recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
@ -62,8 +65,13 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
}
/**
* @threadsafety Always.
*/
void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessResult pr;
try {

@ -25,8 +25,8 @@ const int Service::DefaultMaxCheckAttempts = 3;
const int Service::DefaultCheckInterval = 5 * 60;
const int Service::CheckIntervalDivisor = 5;
boost::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
boost::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
signals2::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
signals2::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
Value Service::GetCheckCommand(void) const
{
@ -350,7 +350,7 @@ void Service::ApplyCheckResult(const Dictionary::Ptr& cr)
/* Make sure the notification component sees the updated
* state/state_type attributes. */
DynamicObject::FlushTx();
DynamicObject::NewTx();
if (IsReachable() && !IsInDowntime() && !IsAcknowledged())
RequestNotifications(NotificationStateChange);
@ -458,6 +458,8 @@ void Service::BeginExecuteCheck(const function<void (void)>& callback)
void Service::CheckCompletedHandler(const Dictionary::Ptr& scheduleInfo,
const ScriptTask::Ptr& task, const function<void (void)>& callback)
{
ObjectLock olock(this);
Set("current_task", Empty);
scheduleInfo->Set("execution_end", Utility::GetTime());
@ -521,7 +523,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
/* flush the current transaction so other instances see the service's
* new state when they receive the CheckResult message */
DynamicObject::FlushTx();
DynamicObject::NewTx();
RequestMessage rm;
rm.SetMethod("checker::CheckResult");

@ -210,6 +210,8 @@ void Service::RemoveExpiredComments(void)
void Service::CommentsExpireTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);

@ -275,6 +275,8 @@ void Service::RemoveExpiredDowntimes(void)
void Service::DowntimesExpireTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);

@ -67,11 +67,17 @@ String Service::GetDisplayName(void) const
return GetName();
}
/**
* @threadsafety Always.
*/
bool Service::Exists(const String& name)
{
return (DynamicObject::GetObject("Service", name));
}
/**
* @threadsafety Always.
*/
Service::Ptr Service::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("Service", name);
@ -82,9 +88,14 @@ Service::Ptr Service::GetByName(const String& name)
return dynamic_pointer_cast<Service>(configObject);
}
/**
* @threadsafety Always.
*/
Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
{
if (!hostName.IsEmpty()) {
recursive_mutex::scoped_lock lock(Application::GetMutex());
Host::Ptr host = Host::GetByName(hostName);
return host->GetServiceByShortName(serviceName);
} else {

@ -180,8 +180,8 @@ public:
static ServiceStateType StateTypeFromString(const String& state);
static String StateTypeToString(ServiceStateType state);
static boost::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
static boost::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
static signals2::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
static signals2::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
/* Downtimes */
static int GetNextDowntimeID(void);

@ -50,11 +50,17 @@ String ServiceGroup::GetActionUrl(void) const
return Get("action_url");
}
/**
* @threadsafety Always.
*/
bool ServiceGroup::Exists(const String& name)
{
return (DynamicObject::GetObject("ServiceGroup", name));
}
/**
* @threadsafety Always.
*/
ServiceGroup::Ptr ServiceGroup::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("ServiceGroup", name);

@ -366,7 +366,7 @@ PyObject *PythonLanguage::PyRegisterFunction(PyObject *self, PyObject *args)
}
{
boost::mutex::scoped_lock lock(Application::GetMutex());
recursive_mutex::scoped_lock lock(Application::GetMutex());
interp->RegisterPythonFunction(name, object);
}

@ -30,10 +30,10 @@ static AttributeDescription endpointAttributes[] = {
REGISTER_TYPE(Endpoint, endpointAttributes);
boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
/**
* Constructor for the Endpoint class.
@ -201,13 +201,13 @@ void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(topic);
shared_ptr<boost::signal<Endpoint::Callback> > sig;
shared_ptr<signals2::signal<Endpoint::Callback> > sig;
if (it == m_TopicHandlers.end()) {
sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
sig = boost::make_shared<signals2::signal<Endpoint::Callback> >();
m_TopicHandlers.insert(make_pair(topic, sig));
} else {
sig = it->second;
@ -271,7 +271,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
if (!request.GetMethod(&method))
return;
map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(method);
if (it == m_TopicHandlers.end())

@ -71,11 +71,11 @@ public:
static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
static signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
static signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
private:
bool m_ReceivedWelcome; /**< Have we received a welcome message
@ -83,7 +83,7 @@ private:
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
map<String, shared_ptr<signals2::signal<Callback> > > m_TopicHandlers;
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);

@ -325,6 +325,8 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
void EndpointManager::SubscriptionTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
DynamicObject::Ptr object;
@ -349,6 +351,8 @@ void EndpointManager::SubscriptionTimerHandler(void)
void EndpointManager::ReconnectTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
@ -373,6 +377,8 @@ void EndpointManager::ReconnectTimerHandler(void)
void EndpointManager::RequestTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
map<String, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
if (it->second.HasTimedOut()) {

@ -60,7 +60,7 @@ public:
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
signals2::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
private:
String m_Identity;

@ -38,7 +38,7 @@ public:
void SendMessage(const MessagePart& message);
boost::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
protected:
virtual void ProcessData(void);