mirror of https://github.com/Icinga/icinga2.git
Merge pull request #9980 from Icinga/config-sync-conflicts
Process `config::update/delete` cluster events gracefully
This commit is contained in:
commit
04ef105caa
|
@ -7,6 +7,7 @@
|
|||
#include "base/configtype.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "config/vmops.hpp"
|
||||
#include <fstream>
|
||||
|
||||
|
@ -104,6 +105,15 @@ Value ApiListener::ConfigUpdateObjectAPIHandler(const MessageOrigin::Ptr& origin
|
|||
return Empty;
|
||||
}
|
||||
|
||||
// Wait for the object name to become available for processing and block it immediately.
|
||||
// Doing so guarantees that only one cluster event (create/update/delete) of a given
|
||||
// object is being processed at any given time.
|
||||
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);
|
||||
|
||||
Defer unlockAndNotify([&listener, &ptype, &objName]{
|
||||
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
|
||||
});
|
||||
|
||||
ConfigObject::Ptr object = ctype->GetObject(objName);
|
||||
|
||||
String config = params->Get("config");
|
||||
|
@ -258,6 +268,15 @@ Value ApiListener::ConfigDeleteObjectAPIHandler(const MessageOrigin::Ptr& origin
|
|||
return Empty;
|
||||
}
|
||||
|
||||
// Wait for the object name to become available for processing and block it immediately.
|
||||
// Doing so guarantees that only one cluster event (create/update/delete) of a given
|
||||
// object is being processed at any given time.
|
||||
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);
|
||||
|
||||
Defer unlockAndNotify([&listener, &ptype, &objName]{
|
||||
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
|
||||
});
|
||||
|
||||
ConfigObject::Ptr object = ctype->GetObject(objName);
|
||||
|
||||
if (!object) {
|
||||
|
@ -462,3 +481,37 @@ void ApiListener::SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient
|
|||
Log(LogInformation, "ApiListener")
|
||||
<< "Finished syncing runtime objects to endpoint '" << endpoint->GetName() << "'.";
|
||||
}
|
||||
|
||||
/**
|
||||
* Locks the specified object name of the given type. If it is already locked, the call blocks until the lock is released.
|
||||
*
|
||||
* @param Type::Ptr ptype The type of the object you want to lock
|
||||
* @param String objName The object name you want to lock
|
||||
*/
|
||||
void ObjectNameMutex::Lock(const Type::Ptr& ptype, const String& objName)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_Mutex);
|
||||
m_CV.wait(lock, [this, &ptype, &objName]{
|
||||
auto& locked = m_LockedObjectNames[ptype.get()];
|
||||
return locked.find(objName) == locked.end();
|
||||
});
|
||||
|
||||
// Add object name to the locked list again to block all other threads that try
|
||||
// to process a message affecting the same object.
|
||||
m_LockedObjectNames[ptype.get()].emplace(objName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlocks the specified object name of the given type.
|
||||
*
|
||||
* @param Type::Ptr ptype The type of the object you want to unlock
|
||||
* @param String objName The name of the object you want to unlock
|
||||
*/
|
||||
void ObjectNameMutex::Unlock(const Type::Ptr& ptype, const String& objName)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_Mutex);
|
||||
m_LockedObjectNames[ptype.get()].erase(objName);
|
||||
}
|
||||
m_CV.notify_all();
|
||||
}
|
||||
|
|
|
@ -71,6 +71,26 @@ enum class ApiCapabilities : uint_fast64_t
|
|||
IfwApiCheckCommand = 1u << 1u,
|
||||
};
|
||||
|
||||
/**
|
||||
* Allows you to easily lock/unlock a specific object of a given type by its name.
|
||||
*
|
||||
* That way, locking an object "this" of type Host does not affect an object "this" of
|
||||
* type "Service" nor an object "other" of type "Host".
|
||||
*
|
||||
* @ingroup remote
|
||||
*/
|
||||
class ObjectNameMutex
|
||||
{
|
||||
public:
|
||||
void Lock(const Type::Ptr& ptype, const String& objName);
|
||||
void Unlock(const Type::Ptr& ptype, const String& objName);
|
||||
|
||||
private:
|
||||
std::mutex m_Mutex;
|
||||
std::condition_variable m_CV;
|
||||
std::map<Type*, std::set<String>> m_LockedObjectNames;
|
||||
};
|
||||
|
||||
/**
|
||||
* @ingroup remote
|
||||
*/
|
||||
|
@ -257,6 +277,9 @@ private:
|
|||
mutable std::mutex m_ActivePackageStagesLock;
|
||||
std::map<String, String> m_ActivePackageStages;
|
||||
|
||||
/* ensures that at most one create/update/delete is being processed per object at each time */
|
||||
mutable ObjectNameMutex m_ObjectConfigChangeLock;
|
||||
|
||||
void UpdateActivePackageStagesCache();
|
||||
};
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#include "remote/apilistener.hpp"
|
||||
#include "config/configcompiler.hpp"
|
||||
#include "config/configitem.hpp"
|
||||
#include "base/atomic-file.hpp"
|
||||
#include "base/configwriter.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/dependencygraph.hpp"
|
||||
|
@ -198,13 +199,21 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
|
|||
return false;
|
||||
}
|
||||
|
||||
// AtomicFile doesn't create not yet existing directories, so we have to do it by ourselves.
|
||||
Utility::MkDirP(Utility::DirName(path), 0700);
|
||||
|
||||
std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc);
|
||||
// Using AtomicFile guarantees that two different threads simultaneously creating and loading the same
|
||||
// configuration file do not interfere with each other, as the configuration is stored in a unique temp file.
|
||||
// When one thread fails to pass object validation, it only deletes its temporary file and does not affect
|
||||
// the other thread in any way.
|
||||
AtomicFile fp(path, 0644);
|
||||
fp << config;
|
||||
fp.close();
|
||||
// Flush the output buffer to catch any errors ASAP and handle them accordingly!
|
||||
// Note: AtomicFile places these configs in a temp file and will be automatically
|
||||
// discarded when it is not committed before going out of scope.
|
||||
fp.flush();
|
||||
|
||||
std::unique_ptr<Expression> expr = ConfigCompiler::CompileFile(path, String(), "_api");
|
||||
std::unique_ptr<Expression> expr = ConfigCompiler::CompileText(path, config, String(), "_api");
|
||||
|
||||
try {
|
||||
ActivationScope ascope;
|
||||
|
@ -225,9 +234,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
|
|||
if (!ConfigItem::CommitItems(ascope.GetContext(), upq, newItems, true)) {
|
||||
if (errors) {
|
||||
Log(LogNotice, "ConfigObjectUtility")
|
||||
<< "Failed to commit config item '" << fullName << "'. Aborting and removing config path '" << path << "'.";
|
||||
|
||||
Utility::Remove(path);
|
||||
<< "Failed to commit config item '" << fullName << "'.";
|
||||
|
||||
for (const boost::exception_ptr& ex : upq.GetExceptions()) {
|
||||
errors->Add(DiagnosticInformation(ex, false));
|
||||
|
@ -248,9 +255,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
|
|||
if (!ConfigItem::ActivateItems(newItems, true, false, false, cookie)) {
|
||||
if (errors) {
|
||||
Log(LogNotice, "ConfigObjectUtility")
|
||||
<< "Failed to activate config object '" << fullName << "'. Aborting and removing config path '" << path << "'.";
|
||||
|
||||
Utility::Remove(path);
|
||||
<< "Failed to activate config object '" << fullName << "'.";
|
||||
|
||||
for (const boost::exception_ptr& ex : upq.GetExceptions()) {
|
||||
errors->Add(DiagnosticInformation(ex, false));
|
||||
|
@ -275,6 +280,9 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
|
|||
ConfigObject::Ptr obj = ctype->GetObject(fullName);
|
||||
|
||||
if (obj) {
|
||||
// Object has surpassed the compiling/validation processes, we can safely commit the file!
|
||||
fp.Commit();
|
||||
|
||||
Log(LogInformation, "ConfigObjectUtility")
|
||||
<< "Created and activated object '" << fullName << "' of type '" << type->GetName() << "'.";
|
||||
} else {
|
||||
|
@ -283,8 +291,6 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
|
|||
}
|
||||
|
||||
} catch (const std::exception& ex) {
|
||||
Utility::Remove(path);
|
||||
|
||||
if (errors)
|
||||
errors->Add(DiagnosticInformation(ex, false));
|
||||
|
||||
|
|
Loading…
Reference in New Issue