diff --git a/lib/checker/checkercomponent.cpp b/lib/checker/checkercomponent.cpp index d92101f4e..1c6fdd46d 100644 --- a/lib/checker/checkercomponent.cpp +++ b/lib/checker/checkercomponent.cpp @@ -60,6 +60,7 @@ void CheckerComponent::OnConfigLoaded() void CheckerComponent::Start(bool runtimeCreated) { ObjectImpl::Start(runtimeCreated); + CheckResultProducerComponent::Start(); Log(LogInformation, "CheckerComponent") << "'" << GetName() << "' started."; @@ -81,6 +82,7 @@ void CheckerComponent::Stop(bool runtimeRemoved) m_CV.notify_all(); } + CheckResultProducerComponent::Stop(); m_ResultTimer->Stop(true); m_Thread.join(); @@ -244,7 +246,7 @@ void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable) cr->SetExecutionStart(now); cr->SetExecutionEnd(now); - checkable->ProcessCheckResult(cr); + checkable->ProcessCheckResult(cr, this); Log(LogCritical, "checker", output); } diff --git a/lib/checker/checkercomponent.hpp b/lib/checker/checkercomponent.hpp index 5ace7571c..1159f8ffe 100644 --- a/lib/checker/checkercomponent.hpp +++ b/lib/checker/checkercomponent.hpp @@ -8,6 +8,7 @@ #include "base/configobject.hpp" #include "base/timer.hpp" #include "base/utility.hpp" +#include "remote/crproducer.hpp" #include #include #include @@ -46,7 +47,7 @@ struct CheckableNextCheckExtractor /** * @ingroup checker */ -class CheckerComponent final : public ObjectImpl +class CheckerComponent final : public ObjectImpl, public CheckResultProducerComponent { public: DECLARE_OBJECT(CheckerComponent); diff --git a/lib/compat/externalcommandlistener.cpp b/lib/compat/externalcommandlistener.cpp index b61813beb..63b610ef2 100644 --- a/lib/compat/externalcommandlistener.cpp +++ b/lib/compat/externalcommandlistener.cpp @@ -32,6 +32,7 @@ void ExternalCommandListener::StatsFunc(const Dictionary::Ptr& status, const Arr void ExternalCommandListener::Start(bool runtimeCreated) { ObjectImpl::Start(runtimeCreated); + CheckResultProducerComponent::Start(); Log(LogInformation, "ExternalCommandListener") << "'" << GetName() << "' started."; @@ -50,6 +51,8 @@ void ExternalCommandListener::Start(bool runtimeCreated) */ void ExternalCommandListener::Stop(bool runtimeRemoved) { + CheckResultProducerComponent::Stop(); + Log(LogInformation, "ExternalCommandListener") << "'" << GetName() << "' stopped."; diff --git a/lib/compat/externalcommandlistener.hpp b/lib/compat/externalcommandlistener.hpp index 895531f78..625fc12e8 100644 --- a/lib/compat/externalcommandlistener.hpp +++ b/lib/compat/externalcommandlistener.hpp @@ -7,6 +7,7 @@ #include "base/objectlock.hpp" #include "base/timer.hpp" #include "base/utility.hpp" +#include "remote/crproducer.hpp" #include #include @@ -16,7 +17,7 @@ namespace icinga /** * @ingroup compat */ -class ExternalCommandListener final : public ObjectImpl +class ExternalCommandListener final : public ObjectImpl, public CheckResultProducerComponent { public: DECLARE_OBJECT(ExternalCommandListener); diff --git a/lib/icinga/apiactions.cpp b/lib/icinga/apiactions.cpp index 3892de776..78d4e7a1e 100644 --- a/lib/icinga/apiactions.cpp +++ b/lib/icinga/apiactions.cpp @@ -126,7 +126,8 @@ Dictionary::Ptr ApiActions::ProcessCheckResult(const ConfigObject::Ptr& object, if (params->Contains("ttl")) cr->SetTtl(HttpUtility::GetLastParameter(params, "ttl")); - Result result = checkable->ProcessCheckResult(cr); + Result result = checkable->ProcessCheckResult(cr, ApiListener::GetInstance()); + switch (result) { case Result::Ok: return ApiActions::CreateResult(200, "Successfully processed check result for object '" + checkable->GetName() + "'."); diff --git a/lib/icinga/clusterevents.cpp b/lib/icinga/clusterevents.cpp index b49d2071d..e65a434bc 100644 --- a/lib/icinga/clusterevents.cpp +++ b/lib/icinga/clusterevents.cpp @@ -176,9 +176,9 @@ Value ClusterEvents::CheckResultAPIHandler(const MessageOrigin::Ptr& origin, con } if (!checkable->IsPaused() && Zone::GetLocalZone() == checkable->GetZone() && endpoint == checkable->GetCommandEndpoint()) - checkable->ProcessCheckResult(cr); + checkable->ProcessCheckResult(cr, ApiListener::GetInstance()); else - checkable->ProcessCheckResult(cr, origin); + checkable->ProcessCheckResult(cr, ApiListener::GetInstance(), origin); return Empty; } diff --git a/lib/livestatus/livestatuslistener.cpp b/lib/livestatus/livestatuslistener.cpp index e44650bfe..236302fc7 100644 --- a/lib/livestatus/livestatuslistener.cpp +++ b/lib/livestatus/livestatuslistener.cpp @@ -47,6 +47,7 @@ void LivestatusListener::StatsFunc(const Dictionary::Ptr& status, const Array::P void LivestatusListener::Start(bool runtimeCreated) { ObjectImpl::Start(runtimeCreated); + CheckResultProducerComponent::Start(); Log(LogInformation, "LivestatusListener") << "'" << GetName() << "' started."; @@ -112,6 +113,7 @@ void LivestatusListener::Stop(bool runtimeRemoved) << "'" << GetName() << "' stopped."; m_Listener->Close(); + CheckResultProducerComponent::Stop(); if (m_Thread.joinable()) m_Thread.join(); diff --git a/lib/livestatus/livestatuslistener.hpp b/lib/livestatus/livestatuslistener.hpp index dc739f6f1..5e4a17735 100644 --- a/lib/livestatus/livestatuslistener.hpp +++ b/lib/livestatus/livestatuslistener.hpp @@ -7,6 +7,7 @@ #include "livestatus/livestatuslistener-ti.hpp" #include "livestatus/livestatusquery.hpp" #include "base/socket.hpp" +#include "remote/crproducer.hpp" #include using namespace icinga; @@ -17,7 +18,7 @@ namespace icinga /** * @ingroup livestatus */ -class LivestatusListener final : public ObjectImpl +class LivestatusListener final : public ObjectImpl, public CheckResultProducerComponent { public: DECLARE_OBJECT(LivestatusListener); diff --git a/lib/remote/CMakeLists.txt b/lib/remote/CMakeLists.txt index b80b96067..547112901 100644 --- a/lib/remote/CMakeLists.txt +++ b/lib/remote/CMakeLists.txt @@ -13,7 +13,7 @@ set(remote_SOURCES apilistener.cpp apilistener.hpp apilistener-ti.hpp apilistener-configsync.cpp apilistener-filesync.cpp apilistener-authority.cpp apiuser.cpp apiuser.hpp apiuser-ti.hpp - crproducer.hpp + crproducer.cpp crproducer.hpp configfileshandler.cpp configfileshandler.hpp configobjectslock.cpp configobjectslock.hpp configobjectutility.cpp configobjectutility.hpp diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 772cd2df2..5c71a3654 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -239,6 +239,8 @@ void ApiListener::OnAllConfigLoaded() */ void ApiListener::Start(bool runtimeCreated) { + CheckResultProducerComponent::Start(); + Log(LogInformation, "ApiListener") << "'" << GetName() << "' started."; @@ -368,6 +370,7 @@ void ApiListener::Stop(bool runtimeDeleted) m_Timer->Stop(true); m_RenewOwnCertTimer->Stop(true); + CheckResultProducerComponent::Stop(); ObjectImpl::Stop(runtimeDeleted); Log(LogInformation, "ApiListener") diff --git a/lib/remote/apilistener.hpp b/lib/remote/apilistener.hpp index eae1fa03e..80d678543 100644 --- a/lib/remote/apilistener.hpp +++ b/lib/remote/apilistener.hpp @@ -4,6 +4,7 @@ #define APILISTENER_H #include "remote/apilistener-ti.hpp" +#include "remote/crproducer.hpp" #include "remote/jsonrpcconnection.hpp" #include "remote/httpserverconnection.hpp" #include "remote/endpoint.hpp" @@ -74,7 +75,7 @@ enum class ApiCapabilities : uint_fast64_t /** * @ingroup remote */ -class ApiListener final : public ObjectImpl +class ApiListener final : public ObjectImpl, public CheckResultProducerComponent { public: DECLARE_OBJECT(ApiListener); diff --git a/lib/remote/crproducer.cpp b/lib/remote/crproducer.cpp new file mode 100644 index 000000000..3182e6d85 --- /dev/null +++ b/lib/remote/crproducer.cpp @@ -0,0 +1,92 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#include "remote/crproducer.hpp" + +using namespace icinga; + +bool CheckResultProducerComponent::try_lock_shared() noexcept +{ + auto state (ModifyState( + [](auto current) { return current.InstanceIsActive; }, + [](auto& desired) { ++desired.ProcessingCheckResults; } + )); + + return state.InstanceIsActive; +} + +void CheckResultProducerComponent::unlock_shared() noexcept +{ + std::unique_lock lock (m_Mutex, std::defer_lock); + + auto state (ModifyState([&lock](auto& desired) { + --desired.ProcessingCheckResults; + + if (!desired.ProcessingCheckResults && !desired.InstanceIsActive && !lock) { + lock.lock(); + } + })); + + if (!state.ProcessingCheckResults && !state.InstanceIsActive) { + m_CV.notify_all(); + } +} + +/** + * Allow processing check results. + */ +void CheckResultProducerComponent::Start() +{ + ModifyState([](auto& desired) { desired.InstanceIsActive = 1; }); +} + +/** + * Disallow processing new check results, wait for all currently processed ones to finish. + */ +void CheckResultProducerComponent::Stop() +{ + std::unique_lock lock (m_Mutex, std::defer_lock); + + auto state (ModifyState([&lock](auto& desired) { + desired.InstanceIsActive = 0; + + if (desired.ProcessingCheckResults && !lock) { + lock.lock(); + } + })); + + if (state.ProcessingCheckResults) { + m_CV.wait(lock, [this] { return !m_State.load().ProcessingCheckResults; }); + } +} + +/** + * Load m_State into x and if cond(x), pass x to mod by reference and try to store x back. + * If m_State has changed in the meantime, repeat the process. + * + * @return The (not) updated m_State. + */ +template +inline +CrpComponentState CheckResultProducerComponent::ModifyState(const C& cond, const M& mod) +{ + auto expected (m_State.load()); + decltype(expected) desired; + + do { + if (!cond(expected)) { + return expected; + } + + desired = expected; + mod(desired); + } while (!m_State.compare_exchange_weak(expected, desired)); + + return desired; +} + +template +inline +CrpComponentState CheckResultProducerComponent::ModifyState(const M& mod) +{ + return ModifyState([](auto) { return true; }, mod); +} diff --git a/lib/remote/crproducer.hpp b/lib/remote/crproducer.hpp index 0ee192e5f..5898518d2 100644 --- a/lib/remote/crproducer.hpp +++ b/lib/remote/crproducer.hpp @@ -2,7 +2,11 @@ #pragma once +#include "base/atomic.hpp" #include "base/object.hpp" +#include +#include +#include namespace icinga { @@ -31,4 +35,32 @@ public: virtual void unlock_shared() noexcept = 0; }; +struct CrpComponentState +{ + uint32_t InstanceIsActive = 0; + uint32_t ProcessingCheckResults = 0; +}; + +class CheckResultProducerComponent : public CheckResultProducer +{ +public: + bool try_lock_shared() noexcept override; + void unlock_shared() noexcept override; + +protected: + void Start(); + void Stop(); + +private: + Atomic m_State {CrpComponentState{}}; + std::mutex m_Mutex; + std::condition_variable m_CV; + + template + CrpComponentState ModifyState(const C& cond, const M& mod); + + template + CrpComponentState ModifyState(const M& mod); +}; + }