diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 12b3bad69..f08436890 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -53,27 +53,44 @@ void CheckerComponent::Stop(void) void CheckerComponent::CheckTimerHandler(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + double now = Utility::GetTime(); long tasks = 0; int missedServices = 0, missedChecks = 0; - while (!m_IdleServices.empty()) { - typedef nth_index::type CheckTimeView; - CheckTimeView& idx = boost::get<1>(m_IdleServices); + for (;;) { + Service::Ptr service; - CheckTimeView::iterator it = idx.begin(); - Service::Ptr service = it->lock(); + { + boost::mutex::scoped_lock lock(m_Mutex); + + typedef nth_index::type CheckTimeView; + CheckTimeView& idx = boost::get<1>(m_IdleServices); + + if (idx.begin() == idx.end()) + break; + + CheckTimeView::iterator it = idx.begin(); + service = it->lock(); + + if (!service) { + idx.erase(it); + continue; + } + + { + ObjectLock olock(service); + + if (service->GetNextCheck() > now) + break; + } - if (!service) { idx.erase(it); - continue; } - if (service->GetNextCheck() > now) - break; - - idx.erase(it); + ObjectLock olock(service); /* reschedule the service if checks are currently disabled * for it and this is not a forced check */ @@ -83,7 +100,14 @@ void CheckerComponent::CheckTimerHandler(void) service->UpdateNextCheck(); - idx.insert(service); + { + boost::mutex::scoped_lock lock(m_Mutex); + + typedef nth_index::type CheckTimeView; + CheckTimeView& idx = boost::get<1>(m_IdleServices); + + idx.insert(service); + } continue; } @@ -136,19 +160,26 @@ void CheckerComponent::CheckTimerHandler(void) void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service) { - /* remove the service from the list of pending services; if it's not in the - * list this was a manual (i.e. forced) check and we must not re-add the - * service to the services list because it's already there. */ - CheckerComponent::ServiceSet::iterator it; - it = m_PendingServices.find(service); - if (it != m_PendingServices.end()) { - m_PendingServices.erase(it); - m_IdleServices.insert(service); + { + boost::mutex::scoped_lock lock(m_Mutex); + + /* remove the service from the list of pending services; if it's not in the + * list this was a manual (i.e. forced) check and we must not re-add the + * service to the services list because it's already there. */ + CheckerComponent::ServiceSet::iterator it; + it = m_PendingServices.find(service); + if (it != m_PendingServices.end()) { + m_PendingServices.erase(it); + m_IdleServices.insert(service); + } } RescheduleCheckTimer(); - Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'"); + { + ObjectLock olock(service); + Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'"); + } } void CheckerComponent::ResultTimerHandler(void) @@ -156,20 +187,35 @@ void CheckerComponent::ResultTimerHandler(void) Logger::Write(LogDebug, "checker", "ResultTimerHandler entered."); stringstream msgbuf; - msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size(); + + { + boost::mutex::scoped_lock lock(m_Mutex); + + msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size(); + } + Logger::Write(LogInformation, "checker", msgbuf.str()); } void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service) { - String checker = service->GetChecker(); + String checker; + + { + ObjectLock olock(service); + checker = service->GetChecker(); + } if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) { + boost::mutex::scoped_lock lock(m_Mutex); + if (m_PendingServices.find(service) != m_PendingServices.end()) return; m_IdleServices.insert(service); } else { + boost::mutex::scoped_lock lock(m_Mutex); + m_IdleServices.erase(service); m_PendingServices.erase(service); } @@ -177,16 +223,20 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service) void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service) { - /* remove and re-insert the service from the set in order to force an index update */ - typedef nth_index::type ServiceView; - ServiceView& idx = boost::get<0>(m_IdleServices); + { + boost::mutex::scoped_lock lock(m_Mutex); - ServiceView::iterator it = idx.find(service); - if (it == idx.end()) - return; + /* remove and re-insert the service from the set in order to force an index update */ + typedef nth_index::type ServiceView; + ServiceView& idx = boost::get<0>(m_IdleServices); - idx.erase(it); - idx.insert(service); + ServiceView::iterator it = idx.find(service); + if (it == idx.end()) + return; + + idx.erase(it); + idx.insert(service); + } RescheduleCheckTimer(); } @@ -199,31 +249,40 @@ void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object) if (!service) return; - m_IdleServices.erase(service); - m_PendingServices.erase(service); + { + boost::mutex::scoped_lock lock(m_Mutex); + + m_IdleServices.erase(service); + m_PendingServices.erase(service); + } } void CheckerComponent::RescheduleCheckTimer(void) { - if (m_IdleServices.empty()) - return; - - typedef nth_index::type CheckTimeView; - CheckTimeView& idx = boost::get<1>(m_IdleServices); - Service::Ptr service; - do { - CheckTimeView::iterator it = idx.begin(); + { + boost::mutex::scoped_lock lock(m_Mutex); - if (it == idx.end()) + if (m_IdleServices.empty()) return; - service = it->lock(); + typedef nth_index::type CheckTimeView; + CheckTimeView& idx = boost::get<1>(m_IdleServices); - if (!service) - idx.erase(it); - } while (!service); + do { + CheckTimeView::iterator it = idx.begin(); + if (it == idx.end()) + return; + + service = it->lock(); + + if (!service) + idx.erase(it); + } while (!service); + } + + ObjectLock olock(service); m_CheckTimer->Reschedule(service->GetNextCheck()); } diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index f930fa04b..4e4134cde 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -37,7 +37,10 @@ struct ServiceNextCheckExtractor if (!service) return 0; - return service->GetNextCheck(); + { + ObjectLock olock(service); + return service->GetNextCheck(); + } } }; @@ -64,6 +67,8 @@ public: private: Endpoint::Ptr m_Endpoint; + boost::mutex m_Mutex; + ServiceSet m_IdleServices; ServiceSet m_PendingServices; diff --git a/components/checker/i2-checker.h b/components/checker/i2-checker.h index 9b7a7f49e..f21721915 100644 --- a/components/checker/i2-checker.h +++ b/components/checker/i2-checker.h @@ -29,17 +29,6 @@ #include #include -#include -#include -#include - -using boost::multi_index_container; -using boost::multi_index::indexed_by; -using boost::multi_index::identity; -using boost::multi_index::ordered_unique; -using boost::multi_index::ordered_non_unique; -using boost::multi_index::nth_index; - #include "checkercomponent.h" #endif /* I2CHECKER_H */ diff --git a/components/compat/compatcomponent.cpp b/components/compat/compatcomponent.cpp index ff080e9ac..360a2af89 100644 --- a/components/compat/compatcomponent.cpp +++ b/components/compat/compatcomponent.cpp @@ -124,7 +124,6 @@ void CompatComponent::CommandPipeThread(const String& commandPath) } } - if (!fifo_ok && mkfifo(commandPath.CStr(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) < 0) BOOST_THROW_EXCEPTION(PosixException("mkfifo() failed", errno)); @@ -154,7 +153,12 @@ void CompatComponent::CommandPipeThread(const String& commandPath) line[strlen(line) - 1] = '\0'; String command = line; - Application::GetEQ().Post(boost::bind(&CompatComponent::ProcessCommand, this, command)); + + { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + + ProcessCommand(command); + } } fclose(fp); @@ -412,6 +416,8 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic */ void CompatComponent::StatusTimerHandler(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + Logger::Write(LogInformation, "compat", "Writing compat status information"); String statuspath = GetStatusPath(); diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index fb00b4335..4e32271a6 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -82,6 +82,8 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P void DelegationComponent::DelegationTimerHandler(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + map histogram; DynamicObject::Ptr object; diff --git a/components/demo/democomponent.cpp b/components/demo/democomponent.cpp index e2657d078..07272ad97 100644 --- a/components/demo/democomponent.cpp +++ b/components/demo/democomponent.cpp @@ -54,6 +54,8 @@ void DemoComponent::Stop(void) */ void DemoComponent::DemoTimerHandler(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + Logger::Write(LogInformation, "demo", "Sending multicast 'hello" " world' message."); diff --git a/components/notification/notificationcomponent.cpp b/components/notification/notificationcomponent.cpp index d4de1111f..a529ce536 100644 --- a/components/notification/notificationcomponent.cpp +++ b/components/notification/notificationcomponent.cpp @@ -53,6 +53,8 @@ void NotificationComponent::Stop(void) */ void NotificationComponent::NotificationTimerHandler(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + // TODO: implement } diff --git a/components/replication/replicationcomponent.cpp b/components/replication/replicationcomponent.cpp index e35a6e3e5..218ca7ada 100644 --- a/components/replication/replicationcomponent.cpp +++ b/components/replication/replicationcomponent.cpp @@ -32,7 +32,7 @@ void ReplicationComponent::Start(void) DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1)); DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1)); - DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _1)); + DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2)); Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1)); diff --git a/icinga-app/icinga.cpp b/icinga-app/icinga.cpp index 08af5900b..1dafdfe95 100644 --- a/icinga-app/icinga.cpp +++ b/icinga-app/icinga.cpp @@ -66,23 +66,6 @@ static bool LoadConfigFiles(bool validateOnly) if (hasError) return false; -/* Logger::Write(LogInformation, "icinga-app", "Validating config items..."); - DynamicType::Ptr type; - BOOST_FOREACH(tie(tuples::ignore, type), DynamicType::GetTypes()) { - ConfigType::Ptr ctype = ConfigType::GetByName(type->GetName()); - - if (!ctype) { - Logger::Write(LogWarning, "icinga-app", "No config type found for type '" + type->GetName() + "'"); - - continue; - } - - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), type->GetObjects()) { - ctype->ValidateObject(object); - } - }*/ - if (validateOnly) return true; @@ -105,8 +88,13 @@ static bool LoadConfigFiles(bool validateOnly) static void ReloadConfigTimerHandler(void) { if (g_ReloadConfig) { - Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files."); - LoadConfigFiles(false); + { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + + Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files."); + LoadConfigFiles(false); + } + g_ReloadConfig = false; } } @@ -136,10 +124,6 @@ int main(int argc, char **argv) lt_dlinit(); #endif /* _WIN32 */ - /* This must be done before calling any other functions - * in the base library. */ - Application::SetMainThread(); - /* Set command-line arguments. */ Application::SetArgC(argc); Application::SetArgV(argv); @@ -252,14 +236,14 @@ int main(int argc, char **argv) return EXIT_FAILURE; } - DynamicObject::BeginTx(); + DynamicObject::NewTx(); bool validateOnly = g_AppParams.count("validate"); if (!LoadConfigFiles(validateOnly)) return EXIT_FAILURE; - DynamicObject::FinishTx(); + DynamicObject::NewTx(); if (validateOnly) { Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate."); @@ -290,4 +274,3 @@ int main(int argc, char **argv) return app->Run(); } - diff --git a/lib/base/application.cpp b/lib/base/application.cpp index 531a8168d..ab91ce35b 100644 --- a/lib/base/application.cpp +++ b/lib/base/application.cpp @@ -21,7 +21,7 @@ using namespace icinga; -boost::mutex Application::m_Mutex; +recursive_mutex Application::m_Mutex; Application *Application::m_Instance = NULL; bool Application::m_ShuttingDown = false; bool Application::m_Debugging = false; @@ -110,25 +110,26 @@ void Application::SetArgV(char **argv) m_ArgV = argv; } -/** - * Runs one iteration of the event loop. - * - * @returns false if we're shutting down, true otherwise. - */ -bool Application::ProcessEvents(void) +void Application::NewTxTimerHandler(void) { - Object::ClearHeldObjects(); + DynamicObject::NewTx(); +} - double sleep = Timer::ProcessTimers(); +#ifdef _DEBUG +void Application::ProfileTimerHandler(void) +{ + stringstream msgbuf; + msgbuf << "Active objects: " << Object::GetAliveObjectsCount(); + Logger::Write(LogInformation, "base", msgbuf.str()); + Object::PrintMemoryProfile(); +} +#endif /* _DEBUG */ + +void Application::ShutdownTimerHandler(void) +{ if (m_ShuttingDown) - return false; - - GetEQ().ProcessEvents(m_Mutex, boost::posix_time::milliseconds(sleep * 1000)); - - DynamicObject::FlushTx(); - - return true; + m_EQ.Stop(); } /** @@ -137,32 +138,31 @@ bool Application::ProcessEvents(void) */ void Application::RunEventLoop(void) const { - boost::mutex::scoped_lock lock(m_Mutex); - -#ifdef _DEBUG - double nextProfile = 0; -#endif /* _DEBUG */ - /* Start the system time watch thread. */ thread t(&Application::TimeWatchThreadProc); t.detach(); - while (!m_ShuttingDown) { - if (!ProcessEvents()) - break; + /* Set up a timer to periodically flush the tx. */ + Timer::Ptr newTxTimer = boost::make_shared(); + newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler)); + newTxTimer->SetInterval(0.5); + newTxTimer->Start(); + + /* Set up a timer that watches the m_Shutdown flag. */ + Timer::Ptr shutdownTimer = boost::make_shared(); + shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler)); + shutdownTimer->SetInterval(0.5); + shutdownTimer->Start(); #ifdef _DEBUG - if (nextProfile < Utility::GetTime()) { - stringstream msgbuf; - msgbuf << "Active objects: " << Object::GetAliveObjectsCount(); - Logger::Write(LogInformation, "base", msgbuf.str()); - - Object::PrintMemoryProfile(); - - nextProfile = Utility::GetTime() + 15.0; - } + /* Set up a timer that periodically prints some information about the object system. */ + Timer::Ptr profileTimer = boost::make_shared(); + profileTimer->OnTimerExpired.connect(boost::bind(&Application::ProfileTimerHandler)); + flushTxTimer->SetInterval(15); + flushTxTimer->Start(); #endif /* _DEBUG */ - } + + GetEQ().Run(); } /** @@ -186,12 +186,7 @@ void Application::TimeWatchThreadProc(void) << " in time: " << abs(timeDiff) << " seconds"; Logger::Write(LogInformation, "base", msgbuf.str()); - /* in addition to rescheduling the timers this - * causes the event loop to wake up thereby - * solving the problem that timed_wait() - * uses an absolute timestamp for the timeout */ - GetEQ().Post(boost::bind(&Timer::AdjustTimers, - -timeDiff)); + Timer::AdjustTimers(-timeDiff); } lastLoop = now; @@ -302,25 +297,6 @@ bool Application::IsDebugging(void) return m_Debugging; } -/** - * Checks whether we're currently on the main thread. - * - * @returns true if this is the main thread, false otherwise - */ -bool Application::IsMainThread(void) -{ - return (boost::this_thread::get_id() == m_MainThreadID); -} - -/** - * Sets the main thread to the currently running thread. - */ -void Application::SetMainThread(void) -{ - m_MainThreadID = boost::this_thread::get_id(); - m_EQ.SetOwner(m_MainThreadID); -} - /** * Displays a message that tells users what to do when they encounter a bug. */ @@ -455,11 +431,11 @@ int Application::Run(void) SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE); #endif /* _WIN32 */ - DynamicObject::BeginTx(); + DynamicObject::NewTx(); result = Main(); - DynamicObject::FinishTx(); + DynamicObject::NewTx(); DynamicObject::DeactivateObjects(); return result; @@ -594,11 +570,11 @@ void Application::SetPkgDataDir(const String& path) } /** - * Returns the global mutex for the main thread. + * Returns the global mutex. * * @returns The mutex. */ -boost::mutex& Application::GetMutex(void) +recursive_mutex& Application::GetMutex(void) { return m_Mutex; } diff --git a/lib/base/application.h b/lib/base/application.h index 358a8ee2e..f816f7b0e 100644 --- a/lib/base/application.h +++ b/lib/base/application.h @@ -62,9 +62,6 @@ public: static void SetDebugging(bool debug); static bool IsDebugging(void); - static bool IsMainThread(void); - static void SetMainThread(void); - void UpdatePidFile(const String& filename); void ClosePidFile(void); @@ -82,9 +79,7 @@ public: static String GetPkgDataDir(void); static void SetPkgDataDir(const String& path); - static bool ProcessEvents(void); - - static boost::mutex& GetMutex(void); + static recursive_mutex& GetMutex(void); static EventQueue& GetEQ(void); @@ -92,7 +87,7 @@ protected: void RunEventLoop(void) const; private: - static boost::mutex m_Mutex; /**< The main thread mutex. */ + static recursive_mutex m_Mutex; /**< The global mutex. */ static Application *m_Instance; /**< The application instance. */ static bool m_ShuttingDown; /**< Whether the application is in the process of @@ -120,6 +115,11 @@ private: static void ExceptionHandler(void); static void TimeWatchThreadProc(void); + static void NewTxTimerHandler(void); +#ifdef _DEBUG + static void ProfileTimerHandler(void) +#endif /* _DEBUG */ + static void ShutdownTimerHandler(void); }; } diff --git a/lib/base/asynctask.h b/lib/base/asynctask.h index ca0f8b3d6..a1941866b 100644 --- a/lib/base/asynctask.h +++ b/lib/base/asynctask.h @@ -18,7 +18,7 @@ ******************************************************************************/ #ifndef ASYNCTASK_H -#define ASYNCTASK_H +#define ASYNCTASK_H namespace icinga { @@ -79,6 +79,7 @@ public: */ bool IsFinished(void) const { + boost::mutex::scoped_lock lock(m_Mutex); return m_Finished; } @@ -133,7 +134,9 @@ public: */ void Wait(void) { - Utility::WaitUntil(boost::bind(&AsyncTask::IsFinished, this)); + boost::mutex::scoped_lock lock(m_Mutex); + while (!m_Finished) + m_CV.wait(lock); } protected: @@ -151,9 +154,14 @@ private: */ void FinishInternal(void) { - assert(!m_Finished); + { + boost::mutex::scoped_lock lock(m_Mutex); + assert(!m_Finished); - m_Finished = true; + m_Finished = true; + + m_CV.notify_all(); + } if (!m_CompletionCallback.empty()) { m_CompletionCallback(GetSelf()); @@ -164,6 +172,8 @@ private: } } + mutable boost::mutex m_Mutex; + boost::condition_variable m_CV; CompletionCallback m_CompletionCallback; /**< The completion callback. */ TResult m_Result; /**< The task's result. */ boost::exception_ptr m_Exception; /**< The task's exception. */ diff --git a/lib/base/component.cpp b/lib/base/component.cpp index f60ed4f6c..86074b7c7 100644 --- a/lib/base/component.cpp +++ b/lib/base/component.cpp @@ -29,8 +29,6 @@ REGISTER_TYPE(Component, NULL); Component::Component(const Dictionary::Ptr& properties) : DynamicObject(properties) { - assert(Application::IsMainThread()); - if (!IsLocal()) BOOST_THROW_EXCEPTION(runtime_error("Component objects must be local.")); diff --git a/lib/base/connection.h b/lib/base/connection.h index 4e48d4081..8167e9a73 100644 --- a/lib/base/connection.h +++ b/lib/base/connection.h @@ -35,7 +35,7 @@ public: void Close(void); - boost::signal OnClosed; + signals2::signal OnClosed; protected: virtual void ProcessData(void) = 0; diff --git a/lib/base/dynamicobject.cpp b/lib/base/dynamicobject.cpp index 10ae87196..4583805c7 100644 --- a/lib/base/dynamicobject.cpp +++ b/lib/base/dynamicobject.cpp @@ -23,10 +23,11 @@ using namespace icinga; double DynamicObject::m_CurrentTx = 0; set DynamicObject::m_ModifiedObjects; +boost::mutex DynamicObject::m_ModifiedObjectsMutex; -boost::signal DynamicObject::OnRegistered; -boost::signal DynamicObject::OnUnregistered; -boost::signal&)> DynamicObject::OnTransactionClosing; +signals2::signal DynamicObject::OnRegistered; +signals2::signal DynamicObject::OnUnregistered; +signals2::signal&)> DynamicObject::OnTransactionClosing; DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) : m_ConfigTx(0) @@ -47,8 +48,12 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) ApplyUpdate(serializedObject, Attribute_Config); } +/* + * @threadsafety Always. + */ DynamicObject::~DynamicObject(void) { + boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex); m_ModifiedObjects.erase(this); } @@ -193,7 +198,10 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data, if (tt.first->second.Type & Attribute_Config) m_ConfigTx = tx; - m_ModifiedObjects.insert(this); + { + boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex); + m_ModifiedObjects.insert(this); + } /* Use insert() rather than [] so we don't overwrite * an existing oldValue if the attribute was previously @@ -272,7 +280,7 @@ String DynamicObject::GetSource(void) const void DynamicObject::Register(void) { - assert(Application::IsMainThread()); + recursive_mutex::scoped_lock lock(Application::GetMutex()); DynamicType::Ptr dtype = GetType(); @@ -294,7 +302,7 @@ void DynamicObject::Start(void) void DynamicObject::Unregister(void) { - assert(Application::IsMainThread()); + recursive_mutex::scoped_lock lock(Application::GetMutex()); DynamicType::Ptr dtype = GetType(); @@ -331,8 +339,13 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method, return task; } +/* + * @threadsafety Always. + */ void DynamicObject::DumpObjects(const String& filename) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'"); String tempFilename = filename + ".tmp"; @@ -391,8 +404,13 @@ void DynamicObject::DumpObjects(const String& filename) BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno)); } +/* + * @threadsafety Always. + */ void DynamicObject::RestoreObjects(const String& filename) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'"); std::fstream fp; @@ -437,8 +455,13 @@ void DynamicObject::RestoreObjects(const String& filename) Logger::Write(LogDebug, "base", msgbuf.str()); } +/* + * @threadsafety Always. + */ void DynamicObject::DeactivateObjects(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + DynamicType::TypeMap::iterator tt; for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) { DynamicType::NameMap::iterator nt; @@ -451,34 +474,42 @@ void DynamicObject::DeactivateObjects(void) } } +/* + * @threadsafety Always. + */ double DynamicObject::GetCurrentTx(void) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + assert(m_CurrentTx != 0); return m_CurrentTx; } -void DynamicObject::BeginTx(void) +/* + * @threadsafety Always. + */ +void DynamicObject::NewTx(void) { - m_CurrentTx = Utility::GetTime(); -} + set objects; -void DynamicObject::FinishTx(void) -{ - BOOST_FOREACH(DynamicObject *object, m_ModifiedObjects) { + { + boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex); + + /* Some objects may accidentally bleed into the next transaction because + * we're not holding the global mutex while "stealing" the modified objects, + * but that's entirely ok. */ + m_ModifiedObjects.swap(objects); + } + + recursive_mutex::scoped_lock lock(Application::GetMutex()); + + BOOST_FOREACH(DynamicObject *object, objects) { object->SendLocalUpdateEvents(); } - OnTransactionClosing(m_ModifiedObjects); - m_ModifiedObjects.clear(); - - m_CurrentTx = 0; -} - -void DynamicObject::FlushTx(void) -{ - FinishTx(); - BeginTx(); + OnTransactionClosing(m_CurrentTx, objects); + m_CurrentTx = Utility::GetTime(); } void DynamicObject::OnInitCompleted(void) @@ -487,8 +518,13 @@ void DynamicObject::OnInitCompleted(void) void DynamicObject::OnAttributeChanged(const String&, const Value&) { } +/* + * @threadsafety Always. + */ DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name) { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + DynamicType::Ptr dtype = DynamicType::GetByName(type); return dtype->GetObject(name); } diff --git a/lib/base/dynamicobject.h b/lib/base/dynamicobject.h index 8e02d075d..24377f3bd 100644 --- a/lib/base/dynamicobject.h +++ b/lib/base/dynamicobject.h @@ -94,9 +94,9 @@ public: void ClearAttributesByType(DynamicAttributeType type); - static boost::signal OnRegistered; - static boost::signal OnUnregistered; - static boost::signal&)> OnTransactionClosing; + static signals2::signal OnRegistered; + static signals2::signal OnUnregistered; + static signals2::signal&)> OnTransactionClosing; ScriptTask::Ptr InvokeMethod(const String& method, const vector& arguments, ScriptTask::CompletionCallback callback); @@ -127,9 +127,7 @@ public: static void DeactivateObjects(void); static double GetCurrentTx(void); - static void BeginTx(void); - static void FinishTx(void); - static void FlushTx(void); + static void NewTx(void); protected: virtual void OnInitCompleted(void); @@ -149,6 +147,7 @@ private: /* This has to be a set of raw pointers because the DynamicObject * constructor has to be able to insert objects into this list. */ static set m_ModifiedObjects; + static boost::mutex m_ModifiedObjectsMutex; friend class DynamicType; /* for OnInitCompleted. */ }; diff --git a/lib/base/dynamictype.cpp b/lib/base/dynamictype.cpp index 75ec7fae6..1e40c0681 100644 --- a/lib/base/dynamictype.cpp +++ b/lib/base/dynamictype.cpp @@ -21,12 +21,19 @@ using namespace icinga; +boost::mutex DynamicType::m_Mutex; + DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory) : m_Name(name), m_ObjectFactory(factory) { } +/** + * @threadsafety Always. + */ DynamicType::Ptr DynamicType::GetByName(const String& name) { + boost::mutex::scoped_lock lock(m_Mutex); + DynamicType::TypeMap::const_iterator tt = GetTypes().find(name); if (tt == GetTypes().end()) @@ -35,12 +42,18 @@ DynamicType::Ptr DynamicType::GetByName(const String& name) return tt->second; } +/** + * @threadsafety Caller must hold DynamicType::m_Mutex while using the map. + */ DynamicType::TypeMap& DynamicType::GetTypes(void) { static DynamicType::TypeMap types; return types; } +/** + * @threadsafety Caller must hold DynamicType::m_Mutex while using the map. + */ DynamicType::NameMap& DynamicType::GetObjects(void) { return m_Objects; @@ -71,9 +84,16 @@ DynamicObject::Ptr DynamicType::GetObject(const String& name) const return nt->second; } +/** + * @threadsafety Always. + */ void DynamicType::RegisterType(const DynamicType::Ptr& type) { - if (GetByName(type->GetName())) + boost::mutex::scoped_lock lock(m_Mutex); + + DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName()); + + if (tt != GetTypes().end()) BOOST_THROW_EXCEPTION(runtime_error("Cannot register class for type '" + type->GetName() + "': Objects of this type already exist.")); @@ -99,6 +119,9 @@ DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUp return obj; } +/** + * @threadsafety Always. + */ bool DynamicType::TypeExists(const String& name) { return (GetByName(name)); diff --git a/lib/base/dynamictype.h b/lib/base/dynamictype.h index b2bbd6e6b..d97a6c660 100644 --- a/lib/base/dynamictype.h +++ b/lib/base/dynamictype.h @@ -47,15 +47,15 @@ public: static void RegisterType(const DynamicType::Ptr& type); static bool TypeExists(const String& name); - + DynamicObject::Ptr CreateObject(const Dictionary::Ptr& serializedUpdate) const; DynamicObject::Ptr GetObject(const String& name) const; void RegisterObject(const DynamicObject::Ptr& object); void UnregisterObject(const DynamicObject::Ptr& object); - static TypeMap& GetTypes(void); - NameMap& GetObjects(void); + /* TODO(thread) make private */ static TypeMap& GetTypes(void); + /* TODO(thread) make private */ NameMap& GetObjects(void); void AddAttribute(const String& name, DynamicAttributeType type); void RemoveAttribute(const String& name); @@ -64,6 +64,7 @@ public: void AddAttributes(const AttributeDescription *attributes, int attributeCount); private: + static boost::mutex m_Mutex; String m_Name; ObjectFactory m_ObjectFactory; map m_Attributes; diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index a236e1cf1..0dc4f40b8 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -21,91 +21,95 @@ using namespace icinga; +/** + * @threadsafety Always. + */ EventQueue::EventQueue(void) : m_Stopped(false) { } -boost::thread::id EventQueue::GetOwner(void) const +/** + * @threadsafety Always. + */ +EventQueue::~EventQueue(void) { - return m_Owner; -} - -void EventQueue::SetOwner(boost::thread::id owner) -{ - m_Owner = owner; + Stop(); } +/** + * @threadsafety Always. + */ void EventQueue::Stop(void) { boost::mutex::scoped_lock lock(m_Mutex); m_Stopped = true; - m_EventAvailable.notify_all(); + m_CV.notify_all(); } /** - * Waits for events using the specified timeout value and processes - * them. + * Spawns worker threads and waits for them to complete. * - * @param mtx The mutex that should be unlocked while waiting. Caller - * must have this mutex locked. - * @param timeout The wait timeout. - * @returns false if the queue has been stopped, true otherwise. + * @threadsafety Always. */ -bool EventQueue::ProcessEvents(boost::mutex& mtx, millisec timeout) +void EventQueue::Run(void) { - vector events; + thread_group threads; - mtx.unlock(); + int cpus = thread::hardware_concurrency(); - { - boost::mutex::scoped_lock lock(m_Mutex); + if (cpus == 0) + cpus = 4; - while (m_Events.empty() && !m_Stopped) { - if (!m_EventAvailable.timed_wait(lock, timeout)) { - mtx.lock(); + for (int i = 0; i < cpus * 4; i++) + threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this)); - return !m_Stopped; + threads.join_all(); +} + +/** + * Waits for events and processes them. + * + * @threadsafety Always. + */ +void EventQueue::QueueThreadProc(void) +{ + while (!m_Stopped) { + vector events; + + { + boost::mutex::scoped_lock lock(m_Mutex); + + while (m_Events.empty() && !m_Stopped) + m_CV.wait(lock); + + events.swap(m_Events); + } + + BOOST_FOREACH(const Callback& ev, events) { + double st = Utility::GetTime(); + + ev(); + + double et = Utility::GetTime(); + + if (et - st > 1.0) { + stringstream msgbuf; + msgbuf << "Event call took " << et - st << " seconds."; + Logger::Write(LogWarning, "base", msgbuf.str()); } } - - events.swap(m_Events); } - - mtx.lock(); - - BOOST_FOREACH(const Callback& ev, events) { - double st = Utility::GetTime(); - - ev(); - - double et = Utility::GetTime(); - - if (et - st > 1.0) { - stringstream msgbuf; - msgbuf << "Event call took " << et - st << " seconds."; - Logger::Write(LogWarning, "base", msgbuf.str()); - } - } - - return !m_Stopped; } /** - * Appends an event to the event queue. Events will be processed in FIFO - * order on the main thread. + * Appends an event to the event queue. Events will be processed in FIFO order. * * @param callback The callback function for the event. + * @threadsafety Always. */ void EventQueue::Post(const EventQueue::Callback& callback) { - if (boost::this_thread::get_id() == m_Owner) { - callback(); - return; - } - - { - boost::mutex::scoped_lock lock(m_Mutex); - m_Events.push_back(callback); - m_EventAvailable.notify_all(); - } + boost::mutex::scoped_lock lock(m_Mutex); + m_Events.push_back(callback); + m_CV.notify_all(); } diff --git a/lib/base/eventqueue.h b/lib/base/eventqueue.h index 422746482..dec61d21c 100644 --- a/lib/base/eventqueue.h +++ b/lib/base/eventqueue.h @@ -34,24 +34,23 @@ public: typedef function Callback; EventQueue(void); + ~EventQueue(void); - bool ProcessEvents(boost::mutex& mtx, millisec timeout = boost::posix_time::milliseconds(30000)); + void Run(void); void Post(const Callback& callback); void Stop(void); - boost::thread::id GetOwner(void) const; - void SetOwner(boost::thread::id owner); - - boost::mutex& GetMutex(void); - private: boost::thread::id m_Owner; boost::mutex m_Mutex; + condition_variable m_CV; + bool m_Stopped; vector m_Events; - condition_variable m_EventAvailable; + + void QueueThreadProc(void); }; } diff --git a/lib/base/i2-base.h b/lib/base/i2-base.h index 8d10d41cf..04e9684f9 100644 --- a/lib/base/i2-base.h +++ b/lib/base/i2-base.h @@ -125,7 +125,7 @@ using std::type_info; #include #include #include -#include +#include #include #include #include @@ -139,6 +139,9 @@ using std::type_info; #include #include #include +#include +#include +#include using boost::shared_ptr; using boost::weak_ptr; @@ -148,6 +151,7 @@ using boost::static_pointer_cast; using boost::function; using boost::thread; using boost::thread_group; +using boost::recursive_mutex; using boost::condition_variable; using boost::system_time; using boost::posix_time::millisec; @@ -155,11 +159,18 @@ using boost::tie; using boost::rethrow_exception; using boost::current_exception; using boost::diagnostic_information; +using boost::multi_index_container; +using boost::multi_index::indexed_by; +using boost::multi_index::identity; +using boost::multi_index::ordered_unique; +using boost::multi_index::ordered_non_unique; +using boost::multi_index::nth_index; namespace tuples = boost::tuples; +namespace signals2 = boost::signals2; #if defined(__APPLE__) && defined(__MACH__) -# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" #endif #include diff --git a/lib/base/logger.cpp b/lib/base/logger.cpp index 42b0551db..b59bbf79e 100644 --- a/lib/base/logger.cpp +++ b/lib/base/logger.cpp @@ -81,7 +81,10 @@ void Logger::Write(LogSeverity severity, const String& facility, entry.Facility = facility; entry.Message = message; - Application::GetEQ().Post(boost::bind(&Logger::ForwardLogEntry, entry)); + { + recursive_mutex::scoped_lock lock(Application::GetMutex()); + ForwardLogEntry(entry); + } } /** @@ -182,4 +185,3 @@ DynamicObject::Ptr ILogger::GetConfig(void) const { return m_Config->GetSelf(); } - diff --git a/lib/base/netstring.cpp b/lib/base/netstring.cpp index 692c7754e..dc95cf4c3 100644 --- a/lib/base/netstring.cpp +++ b/lib/base/netstring.cpp @@ -29,6 +29,7 @@ using namespace icinga; * @returns true if a complete String was read from the IOQueue, false otherwise. * @exception invalid_argument The input stream is invalid. * @see https://github.com/PeterScott/netString-c/blob/master/netString.c + * @threadsafety Always. */ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) { @@ -110,6 +111,7 @@ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) * * @param stream The stream. * @param str The String that is to be written. + * @threadsafety Always. */ void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str) { diff --git a/lib/base/object.cpp b/lib/base/object.cpp index 10b591b52..aca652d88 100644 --- a/lib/base/object.cpp +++ b/lib/base/object.cpp @@ -25,43 +25,13 @@ using namespace icinga; * Default constructor for the Object class. */ Object::Object(void) -{ -#ifdef _DEBUG - boost::mutex::scoped_lock lock(*GetMutex()); - GetAliveObjects()->insert(this); -#endif /* _DEBUG */ -} +{ } /** * Destructor for the Object class. */ Object::~Object(void) -{ -#ifdef _DEBUG - boost::mutex::scoped_lock lock(*GetMutex()); - GetAliveObjects()->erase(this); -#endif /* _DEBUG */ -} - -/** - * Temporarily holds onto a reference for an object. This can - * be used to safely clear the last reference to an object - * in an event handler. - */ -void Object::Hold(void) -{ - boost::mutex::scoped_lock lock(*GetMutex()); - GetHeldObjects().push_back(GetSelf()); -} - -/** - * Clears all temporarily held objects. - */ -void Object::ClearHeldObjects(void) -{ - boost::mutex::scoped_lock lock(*GetMutex()); - GetHeldObjects().clear(); -} +{ } /** * Returns a reference-counted pointer to this object. @@ -73,91 +43,14 @@ Object::SharedPtrHolder Object::GetSelf(void) return Object::SharedPtrHolder(shared_from_this()); } -#ifdef _DEBUG /** - * Retrieves the number of currently alive objects. + * Returns the mutex that must be held while calling non-static methods + * which have not been explicitly marked as thread-safe. * - * @returns The number of alive objects. + * @returns The object's mutex. + * @threadsafety Always. */ -int Object::GetAliveObjectsCount(void) +recursive_mutex& Object::GetMutex(void) { - boost::mutex::scoped_lock lock(*GetMutex()); - return GetAliveObjects()->size(); + return m_Mutex; } - -/** - * Dumps a memory histogram to the "dictionaries.dump" file. - */ -void Object::PrintMemoryProfile(void) -{ - map types; - - ofstream dictfp("dictionaries.dump.tmp"); - - { - boost::mutex::scoped_lock lock(*GetMutex()); - set::iterator it; - BOOST_FOREACH(Object *obj, *GetAliveObjects()) { - pair::iterator, bool> tt; - tt = types.insert(make_pair(Utility::GetTypeName(typeid(*obj)), 1)); - if (!tt.second) - tt.first->second++; - - if (typeid(*obj) == typeid(Dictionary)) { - Dictionary::Ptr dict = obj->GetSelf(); - dictfp << Value(dict).Serialize() << std::endl; - } - } - } - -#ifdef _WIN32 - _unlink("dictionaries.dump"); -#endif /* _WIN32 */ - - dictfp.close(); - if (rename("dictionaries.dump.tmp", "dictionaries.dump") < 0) - BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno)); - - String type; - int count; - BOOST_FOREACH(tie(type, count), types) { - std::cerr << type << ": " << count << std::endl; - } -} - -/** - * Returns currently active objects. - * - * @returns currently active objects - */ -set *Object::GetAliveObjects(void) -{ - static set *aliveObjects = new set(); - return aliveObjects; -} -#endif /* _DEBUG */ - -/** - * Returns the mutex used for accessing static members. - * - * @returns a mutex - */ -boost::mutex *Object::GetMutex(void) -{ - static boost::mutex *mutex = new boost::mutex(); - return mutex; -} - -/** - * Returns currently held objects. The caller must be - * holding the mutex returned by GetMutex(). - * - * @returns currently held objects - */ -vector& Object::GetHeldObjects(void) -{ - static vector heldObjects; - return heldObjects; -} - - diff --git a/lib/base/object.h b/lib/base/object.h index fa0fb4c95..c51c05fb6 100644 --- a/lib/base/object.h +++ b/lib/base/object.h @@ -31,15 +31,12 @@ class SharedPtrHolder; * * @ingroup base */ -class I2_BASE_API Object : public enable_shared_from_this, public boost::signals::trackable +class I2_BASE_API Object : public enable_shared_from_this { public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void Hold(void); - static void ClearHeldObjects(void); - /** * Holds a shared pointer and provides support for implicit upcasts. * @@ -96,10 +93,7 @@ public: SharedPtrHolder GetSelf(void); -#ifdef _DEBUG - static int GetAliveObjectsCount(void); - static void PrintMemoryProfile(void); -#endif /* _DEBUG */ + recursive_mutex& GetMutex(void); protected: Object(void); @@ -109,9 +103,24 @@ private: Object(const Object& other); Object& operator=(const Object& rhs); - static boost::mutex *GetMutex(void); - static set *GetAliveObjects(void); - static vector& GetHeldObjects(void); + recursive_mutex m_Mutex; +}; + +/** + * A scoped lock for Objects. + */ +struct ObjectLock { +public: + ObjectLock(const Object::Ptr& object) + : m_Lock(object->GetMutex()) + { } + + ObjectLock(Object *object) + : m_Lock(object->GetMutex()) + { } + +private: + recursive_mutex::scoped_lock m_Lock; }; /** diff --git a/lib/base/process-unix.cpp b/lib/base/process-unix.cpp index d3306e7ee..2331e37c9 100644 --- a/lib/base/process-unix.cpp +++ b/lib/base/process-unix.cpp @@ -26,7 +26,7 @@ using namespace icinga; int Process::m_TaskFd; extern char **environ; -void Process::CreateWorkers(void) +void Process::Initialize(void) { int fds[2]; diff --git a/lib/base/process-windows.cpp b/lib/base/process-windows.cpp index d6db2df03..3dace62fe 100644 --- a/lib/base/process-windows.cpp +++ b/lib/base/process-windows.cpp @@ -22,7 +22,7 @@ using namespace icinga; -void Process::CreateWorkers(void) +void Process::Initialize(void) { // TODO: implement } diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 229a4d7d5..d959c0102 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -21,20 +21,14 @@ using namespace icinga; -bool Process::m_WorkersCreated = false; +boost::once_flag Process::m_ThreadOnce; boost::mutex Process::m_Mutex; deque Process::m_Tasks; Process::Process(const vector& arguments, const Dictionary::Ptr& extraEnvironment) : AsyncTask(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment) { - assert(Application::IsMainThread()); - - if (!m_WorkersCreated) { - CreateWorkers(); - - m_WorkersCreated = true; - } + boost::call_once(&Process::Initialize, m_ThreadOnce); #ifndef _WIN32 m_FD = -1; diff --git a/lib/base/process.h b/lib/base/process.h index 2a14c16fc..618acfaa4 100644 --- a/lib/base/process.h +++ b/lib/base/process.h @@ -54,8 +54,6 @@ public: static vector SplitCommand(const Value& command); private: - static bool m_WorkersCreated; - vector m_Arguments; Dictionary::Ptr m_ExtraEnvironment; @@ -76,7 +74,6 @@ private: static int m_TaskFd; #endif /* _WIN32 */ - static void CreateWorkers(void); static void NotifyWorker(void); void SpawnTask(void); @@ -89,6 +86,9 @@ private: void InitTask(void); bool RunTask(void); + + static boost::once_flag m_ThreadOnce; + static void Initialize(void); }; } diff --git a/lib/base/script.cpp b/lib/base/script.cpp index abf1a6c97..0bcc5ca77 100644 --- a/lib/base/script.cpp +++ b/lib/base/script.cpp @@ -32,12 +32,6 @@ Script::Script(const Dictionary::Ptr& properties) : DynamicObject(properties) { } -Script::~Script(void) -{ - if (m_Interpreter) - m_Interpreter->Stop(); -} - void Script::OnInitCompleted(void) { SpawnInterpreter(); @@ -63,10 +57,6 @@ void Script::SpawnInterpreter(void) { Logger::Write(LogInformation, "base", "Reloading script '" + GetName() + "'"); - if (m_Interpreter) - m_Interpreter->Stop(); - ScriptLanguage::Ptr language = ScriptLanguage::GetByName(GetLanguage()); m_Interpreter = language->CreateInterpreter(GetSelf()); - m_Interpreter->Start(); } diff --git a/lib/base/script.h b/lib/base/script.h index 3cae28e3a..daaf288e1 100644 --- a/lib/base/script.h +++ b/lib/base/script.h @@ -37,7 +37,6 @@ public: typedef weak_ptr