Merge pull request from Icinga/feature/improve-config-sync-locking

Improve config sync locking
This commit is contained in:
Alexander Aleksandrovič Klimov 2020-11-27 17:55:15 +01:00 committed by GitHub
commit 3f4b09f01c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 116 deletions

View File

@ -64,7 +64,6 @@ set(base_SOURCES
shared-object.hpp shared-object.hpp
singleton.hpp singleton.hpp
socket.cpp socket.hpp socket.cpp socket.hpp
spinlock.cpp spinlock.hpp
stacktrace.cpp stacktrace.hpp stacktrace.cpp stacktrace.hpp
statsfunction.hpp statsfunction.hpp
stdiostream.cpp stdiostream.hpp stdiostream.cpp stdiostream.hpp

View File

@ -49,7 +49,8 @@ static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT; static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
Process::Process(Process::Arguments arguments, Dictionary::Ptr extraEnvironment) Process::Process(Process::Arguments arguments, Dictionary::Ptr extraEnvironment)
: m_Arguments(std::move(arguments)), m_ExtraEnvironment(std::move(extraEnvironment)), m_Timeout(600), m_AdjustPriority(false) : m_Arguments(std::move(arguments)), m_ExtraEnvironment(std::move(extraEnvironment)),
m_Timeout(600), m_AdjustPriority(false), m_ResultAvailable(false)
#ifdef _WIN32 #ifdef _WIN32
, m_ReadPending(false), m_ReadFailed(false), m_Overlapped() , m_ReadPending(false), m_ReadFailed(false), m_Overlapped()
#endif /* _WIN32 */ #endif /* _WIN32 */
@ -1014,6 +1015,12 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
#endif /* _WIN32 */ #endif /* _WIN32 */
} }
const ProcessResult& Process::WaitForResult() {
std::unique_lock<std::mutex> lock(m_ResultMutex);
m_ResultCondition.wait(lock, [this]{ return m_ResultAvailable; });
return m_Result;
}
bool Process::DoEvents() bool Process::DoEvents()
{ {
bool is_timeout = false; bool is_timeout = false;
@ -1121,10 +1128,15 @@ bool Process::DoEvents()
} }
#endif /* _WIN32 */ #endif /* _WIN32 */
{
std::lock_guard<std::mutex> lock(m_ResultMutex);
m_Result.PID = m_PID; m_Result.PID = m_PID;
m_Result.ExecutionEnd = Utility::GetTime(); m_Result.ExecutionEnd = Utility::GetTime();
m_Result.ExitStatus = exitcode; m_Result.ExitStatus = exitcode;
m_Result.Output = output; m_Result.Output = output;
m_ResultAvailable = true;
}
m_ResultCondition.notify_all();
if (m_Callback) if (m_Callback)
Utility::QueueAsyncCallback(std::bind(m_Callback, m_Result)); Utility::QueueAsyncCallback(std::bind(m_Callback, m_Result));

View File

@ -9,6 +9,8 @@
#include <deque> #include <deque>
#include <vector> #include <vector>
#include <sstream> #include <sstream>
#include <mutex>
#include <condition_variable>
namespace icinga namespace icinga
{ {
@ -61,6 +63,8 @@ public:
void Run(const std::function<void (const ProcessResult&)>& callback = std::function<void (const ProcessResult&)>()); void Run(const std::function<void (const ProcessResult&)>& callback = std::function<void (const ProcessResult&)>());
const ProcessResult& WaitForResult();
pid_t GetPID() const; pid_t GetPID() const;
static Arguments PrepareCommand(const Value& command); static Arguments PrepareCommand(const Value& command);
@ -94,6 +98,9 @@ private:
std::ostringstream m_OutputStream; std::ostringstream m_OutputStream;
std::function<void (const ProcessResult&)> m_Callback; std::function<void (const ProcessResult&)> m_Callback;
ProcessResult m_Result; ProcessResult m_Result;
bool m_ResultAvailable;
std::mutex m_ResultMutex;
std::condition_variable m_ResultCondition;
static void IOThreadProc(int tid); static void IOThreadProc(int tid);
bool DoEvents(); bool DoEvents();

View File

@ -1,22 +0,0 @@
/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */
#include "base/spinlock.hpp"
#include <atomic>
using namespace icinga;
void SpinLock::lock()
{
while (m_Locked.test_and_set(std::memory_order_acquire)) {
}
}
bool SpinLock::try_lock()
{
return !m_Locked.test_and_set(std::memory_order_acquire);
}
void SpinLock::unlock()
{
m_Locked.clear(std::memory_order_release);
}

View File

@ -1,35 +0,0 @@
/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */
#ifndef SPINLOCK_H
#define SPINLOCK_H
#include <atomic>
namespace icinga
{
/**
* A spin lock.
*
* @ingroup base
*/
class SpinLock
{
public:
SpinLock() = default;
SpinLock(const SpinLock&) = delete;
SpinLock& operator=(const SpinLock&) = delete;
SpinLock(SpinLock&&) = delete;
SpinLock& operator=(SpinLock&&) = delete;
void lock();
bool try_lock();
void unlock();
private:
std::atomic_flag m_Locked = ATOMIC_FLAG_INIT;
};
}
#endif /* SPINLOCK_H */

View File

@ -20,7 +20,7 @@ using namespace icinga;
REGISTER_APIFUNCTION(Update, config, &ApiListener::ConfigUpdateHandler); REGISTER_APIFUNCTION(Update, config, &ApiListener::ConfigUpdateHandler);
SpinLock ApiListener::m_ConfigSyncStageLock; std::mutex ApiListener::m_ConfigSyncStageLock;
/** /**
* Entrypoint for updating all authoritative configs from /etc/zones.d, packages, etc. * Entrypoint for updating all authoritative configs from /etc/zones.d, packages, etc.
@ -330,7 +330,7 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
/* Only one transaction is allowed, concurrent message handlers need to wait. /* Only one transaction is allowed, concurrent message handlers need to wait.
* This affects two parent endpoints sending the config in the same moment. * This affects two parent endpoints sending the config in the same moment.
*/ */
auto lock (Shared<std::unique_lock<SpinLock>>::Make(m_ConfigSyncStageLock)); std::lock_guard<std::mutex> lock(m_ConfigSyncStageLock);
String apiZonesStageDir = GetApiZonesStageDir(); String apiZonesStageDir = GetApiZonesStageDir();
String fromEndpointName = origin->FromClient->GetEndpoint()->GetName(); String fromEndpointName = origin->FromClient->GetEndpoint()->GetName();
@ -544,7 +544,7 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
Log(LogInformation, "ApiListener") Log(LogInformation, "ApiListener")
<< "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName << "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName
<< "' are different to production, triggering validation and reload."; << "' are different to production, triggering validation and reload.";
AsyncTryActivateZonesStage(relativePaths, lock); TryActivateZonesStage(relativePaths);
} else { } else {
Log(LogInformation, "ApiListener") Log(LogInformation, "ApiListener")
<< "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName << "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName
@ -554,17 +554,44 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
} }
/** /**
* Callback for stage config validation. * Spawns a new validation process with 'System.ZonesStageVarDir' set to override the config validation zone dirs with
* When validation was successful, the configuration is copied from * our current stage. Then waits for the validation result and if it was successful, the configuration is copied from
* stage to production and a restart is triggered. * stage to production and a restart is triggered. On validation failure, there is no restart and this is logged.
* On failure, there's no restart and this is logged. *
* The caller of this function must hold m_ConfigSyncStageLock.
* *
* @param pr Result of the validation process.
* @param relativePaths Collected paths including the zone name, which are copied from stage to current directories. * @param relativePaths Collected paths including the zone name, which are copied from stage to current directories.
*/ */
void ApiListener::TryActivateZonesStageCallback(const ProcessResult& pr, void ApiListener::TryActivateZonesStage(const std::vector<String>& relativePaths)
const std::vector<String>& relativePaths)
{ {
VERIFY(Application::GetArgC() >= 1);
/* Inherit parent process args. */
Array::Ptr args = new Array({
Application::GetExePath(Application::GetArgV()[0]),
});
for (int i = 1; i < Application::GetArgC(); i++) {
String argV = Application::GetArgV()[i];
if (argV == "-d" || argV == "--daemonize")
continue;
args->Add(argV);
}
args->Add("--validate");
// Set the ZonesStageDir. This creates our own local chroot without any additional automated zone includes.
args->Add("--define");
args->Add("System.ZonesStageVarDir=" + GetApiZonesStageDir());
Process::Ptr process = new Process(Process::PrepareCommand(args));
process->SetTimeout(Application::GetReloadTimeout());
process->Run();
const ProcessResult& pr = process->WaitForResult();
String apiZonesDir = GetApiZonesDir(); String apiZonesDir = GetApiZonesDir();
String apiZonesStageDir = GetApiZonesStageDir(); String apiZonesStageDir = GetApiZonesStageDir();
@ -628,44 +655,6 @@ void ApiListener::TryActivateZonesStageCallback(const ProcessResult& pr,
listener->UpdateLastFailedZonesStageValidation(pr.Output); listener->UpdateLastFailedZonesStageValidation(pr.Output);
} }
/**
* Spawns a new validation process and waits for its output.
* Sets 'System.ZonesStageVarDir' to override the config validation zone dirs with our current stage.
*
* @param relativePaths Required for later file operations in the callback. Provides the zone name plus path in a list.
*/
void ApiListener::AsyncTryActivateZonesStage(const std::vector<String>& relativePaths, const Shared<std::unique_lock<SpinLock>>::Ptr& lock)
{
VERIFY(Application::GetArgC() >= 1);
/* Inherit parent process args. */
Array::Ptr args = new Array({
Application::GetExePath(Application::GetArgV()[0]),
});
for (int i = 1; i < Application::GetArgC(); i++) {
String argV = Application::GetArgV()[i];
if (argV == "-d" || argV == "--daemonize")
continue;
args->Add(argV);
}
args->Add("--validate");
// Set the ZonesStageDir. This creates our own local chroot without any additional automated zone includes.
args->Add("--define");
args->Add("System.ZonesStageVarDir=" + GetApiZonesStageDir());
Process::Ptr process = new Process(Process::PrepareCommand(args));
process->SetTimeout(Application::GetReloadTimeout());
process->Run([relativePaths, lock](const ProcessResult& pr) {
TryActivateZonesStageCallback(pr, relativePaths);
});
}
/** /**
* Update the structure from the last failed validation output. * Update the structure from the last failed validation output.
* Uses the current timestamp. * Uses the current timestamp.

View File

@ -11,7 +11,6 @@
#include "base/configobject.hpp" #include "base/configobject.hpp"
#include "base/process.hpp" #include "base/process.hpp"
#include "base/shared.hpp" #include "base/shared.hpp"
#include "base/spinlock.hpp"
#include "base/timer.hpp" #include "base/timer.hpp"
#include "base/workqueue.hpp" #include "base/workqueue.hpp"
#include "base/tcpsocket.hpp" #include "base/tcpsocket.hpp"
@ -188,7 +187,7 @@ private:
void RemoveStatusFile(); void RemoveStatusFile();
/* filesync */ /* filesync */
static SpinLock m_ConfigSyncStageLock; static std::mutex m_ConfigSyncStageLock;
void SyncLocalZoneDirs() const; void SyncLocalZoneDirs() const;
void SyncLocalZoneDir(const Zone::Ptr& zone) const; void SyncLocalZoneDir(const Zone::Ptr& zone) const;
@ -200,9 +199,7 @@ private:
static ConfigDirInformation LoadConfigDir(const String& dir); static ConfigDirInformation LoadConfigDir(const String& dir);
static void ConfigGlobHandler(ConfigDirInformation& config, const String& path, const String& file); static void ConfigGlobHandler(ConfigDirInformation& config, const String& path, const String& file);
static void TryActivateZonesStageCallback(const ProcessResult& pr, static void TryActivateZonesStage(const std::vector<String>& relativePaths);
const std::vector<String>& relativePaths);
static void AsyncTryActivateZonesStage(const std::vector<String>& relativePaths, const Shared<std::unique_lock<SpinLock>>::Ptr& lock);
static String GetChecksum(const String& content); static String GetChecksum(const String& content);
static bool CheckConfigChange(const ConfigDirInformation& oldConfig, const ConfigDirInformation& newConfig); static bool CheckConfigChange(const ConfigDirInformation& oldConfig, const ConfigDirInformation& newConfig);