*#Stop(): wait for own Checkable#ProcessCheckResult()s to finish

This commit is contained in:
Alexander A. Klimov 2025-04-02 12:16:07 +02:00
parent 23c2365115
commit c124f4beef
13 changed files with 148 additions and 9 deletions

View File

@ -60,6 +60,7 @@ void CheckerComponent::OnConfigLoaded()
void CheckerComponent::Start(bool runtimeCreated)
{
ObjectImpl<CheckerComponent>::Start(runtimeCreated);
CheckResultProducerComponent::Start();
Log(LogInformation, "CheckerComponent")
<< "'" << GetName() << "' started.";
@ -81,6 +82,7 @@ void CheckerComponent::Stop(bool runtimeRemoved)
m_CV.notify_all();
}
CheckResultProducerComponent::Stop();
m_ResultTimer->Stop(true);
m_Thread.join();
@ -244,7 +246,7 @@ void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);
checkable->ProcessCheckResult(cr);
checkable->ProcessCheckResult(cr, this);
Log(LogCritical, "checker", output);
}

View File

@ -8,6 +8,7 @@
#include "base/configobject.hpp"
#include "base/timer.hpp"
#include "base/utility.hpp"
#include "remote/crproducer.hpp"
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
@ -46,7 +47,7 @@ struct CheckableNextCheckExtractor
/**
* @ingroup checker
*/
class CheckerComponent final : public ObjectImpl<CheckerComponent>
class CheckerComponent final : public ObjectImpl<CheckerComponent>, public CheckResultProducerComponent
{
public:
DECLARE_OBJECT(CheckerComponent);

View File

@ -32,6 +32,7 @@ void ExternalCommandListener::StatsFunc(const Dictionary::Ptr& status, const Arr
void ExternalCommandListener::Start(bool runtimeCreated)
{
ObjectImpl<ExternalCommandListener>::Start(runtimeCreated);
CheckResultProducerComponent::Start();
Log(LogInformation, "ExternalCommandListener")
<< "'" << GetName() << "' started.";
@ -50,6 +51,8 @@ void ExternalCommandListener::Start(bool runtimeCreated)
*/
void ExternalCommandListener::Stop(bool runtimeRemoved)
{
CheckResultProducerComponent::Stop();
Log(LogInformation, "ExternalCommandListener")
<< "'" << GetName() << "' stopped.";

View File

@ -7,6 +7,7 @@
#include "base/objectlock.hpp"
#include "base/timer.hpp"
#include "base/utility.hpp"
#include "remote/crproducer.hpp"
#include <thread>
#include <iostream>
@ -16,7 +17,7 @@ namespace icinga
/**
* @ingroup compat
*/
class ExternalCommandListener final : public ObjectImpl<ExternalCommandListener>
class ExternalCommandListener final : public ObjectImpl<ExternalCommandListener>, public CheckResultProducerComponent
{
public:
DECLARE_OBJECT(ExternalCommandListener);

View File

@ -126,7 +126,8 @@ Dictionary::Ptr ApiActions::ProcessCheckResult(const ConfigObject::Ptr& object,
if (params->Contains("ttl"))
cr->SetTtl(HttpUtility::GetLastParameter(params, "ttl"));
Result result = checkable->ProcessCheckResult(cr);
Result result = checkable->ProcessCheckResult(cr, ApiListener::GetInstance());
switch (result) {
case Result::Ok:
return ApiActions::CreateResult(200, "Successfully processed check result for object '" + checkable->GetName() + "'.");

View File

@ -176,9 +176,9 @@ Value ClusterEvents::CheckResultAPIHandler(const MessageOrigin::Ptr& origin, con
}
if (!checkable->IsPaused() && Zone::GetLocalZone() == checkable->GetZone() && endpoint == checkable->GetCommandEndpoint())
checkable->ProcessCheckResult(cr);
checkable->ProcessCheckResult(cr, ApiListener::GetInstance());
else
checkable->ProcessCheckResult(cr, origin);
checkable->ProcessCheckResult(cr, ApiListener::GetInstance(), origin);
return Empty;
}

View File

@ -47,6 +47,7 @@ void LivestatusListener::StatsFunc(const Dictionary::Ptr& status, const Array::P
void LivestatusListener::Start(bool runtimeCreated)
{
ObjectImpl<LivestatusListener>::Start(runtimeCreated);
CheckResultProducerComponent::Start();
Log(LogInformation, "LivestatusListener")
<< "'" << GetName() << "' started.";
@ -112,6 +113,7 @@ void LivestatusListener::Stop(bool runtimeRemoved)
<< "'" << GetName() << "' stopped.";
m_Listener->Close();
CheckResultProducerComponent::Stop();
if (m_Thread.joinable())
m_Thread.join();

View File

@ -7,6 +7,7 @@
#include "livestatus/livestatuslistener-ti.hpp"
#include "livestatus/livestatusquery.hpp"
#include "base/socket.hpp"
#include "remote/crproducer.hpp"
#include <thread>
using namespace icinga;
@ -17,7 +18,7 @@ namespace icinga
/**
* @ingroup livestatus
*/
class LivestatusListener final : public ObjectImpl<LivestatusListener>
class LivestatusListener final : public ObjectImpl<LivestatusListener>, public CheckResultProducerComponent
{
public:
DECLARE_OBJECT(LivestatusListener);

View File

@ -13,7 +13,7 @@ set(remote_SOURCES
apilistener.cpp apilistener.hpp apilistener-ti.hpp apilistener-configsync.cpp apilistener-filesync.cpp
apilistener-authority.cpp
apiuser.cpp apiuser.hpp apiuser-ti.hpp
crproducer.hpp
crproducer.cpp crproducer.hpp
configfileshandler.cpp configfileshandler.hpp
configobjectslock.cpp configobjectslock.hpp
configobjectutility.cpp configobjectutility.hpp

View File

@ -239,6 +239,8 @@ void ApiListener::OnAllConfigLoaded()
*/
void ApiListener::Start(bool runtimeCreated)
{
CheckResultProducerComponent::Start();
Log(LogInformation, "ApiListener")
<< "'" << GetName() << "' started.";
@ -368,6 +370,7 @@ void ApiListener::Stop(bool runtimeDeleted)
m_Timer->Stop(true);
m_RenewOwnCertTimer->Stop(true);
CheckResultProducerComponent::Stop();
ObjectImpl<ApiListener>::Stop(runtimeDeleted);
Log(LogInformation, "ApiListener")

View File

@ -4,6 +4,7 @@
#define APILISTENER_H
#include "remote/apilistener-ti.hpp"
#include "remote/crproducer.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/httpserverconnection.hpp"
#include "remote/endpoint.hpp"
@ -74,7 +75,7 @@ enum class ApiCapabilities : uint_fast64_t
/**
* @ingroup remote
*/
class ApiListener final : public ObjectImpl<ApiListener>
class ApiListener final : public ObjectImpl<ApiListener>, public CheckResultProducerComponent
{
public:
DECLARE_OBJECT(ApiListener);

92
lib/remote/crproducer.cpp Normal file
View File

@ -0,0 +1,92 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include "remote/crproducer.hpp"
using namespace icinga;
bool CheckResultProducerComponent::try_lock_shared() noexcept
{
auto state (ModifyState(
[](auto current) { return current.InstanceIsActive; },
[](auto& desired) { ++desired.ProcessingCheckResults; }
));
return state.InstanceIsActive;
}
void CheckResultProducerComponent::unlock_shared() noexcept
{
std::unique_lock lock (m_Mutex, std::defer_lock);
auto state (ModifyState([&lock](auto& desired) {
--desired.ProcessingCheckResults;
if (!desired.ProcessingCheckResults && !desired.InstanceIsActive && !lock) {
lock.lock();
}
}));
if (!state.ProcessingCheckResults && !state.InstanceIsActive) {
m_CV.notify_all();
}
}
/**
* Allow processing check results.
*/
void CheckResultProducerComponent::Start()
{
ModifyState([](auto& desired) { desired.InstanceIsActive = 1; });
}
/**
* Disallow processing new check results, wait for all currently processed ones to finish.
*/
void CheckResultProducerComponent::Stop()
{
std::unique_lock lock (m_Mutex, std::defer_lock);
auto state (ModifyState([&lock](auto& desired) {
desired.InstanceIsActive = 0;
if (desired.ProcessingCheckResults && !lock) {
lock.lock();
}
}));
if (state.ProcessingCheckResults) {
m_CV.wait(lock, [this] { return !m_State.load().ProcessingCheckResults; });
}
}
/**
* Load m_State into x and if cond(x), pass x to mod by reference and try to store x back.
* If m_State has changed in the meantime, repeat the process.
*
* @return The (not) updated m_State.
*/
template<class C, class M>
inline
CrpComponentState CheckResultProducerComponent::ModifyState(const C& cond, const M& mod)
{
auto expected (m_State.load());
decltype(expected) desired;
do {
if (!cond(expected)) {
return expected;
}
desired = expected;
mod(desired);
} while (!m_State.compare_exchange_weak(expected, desired));
return desired;
}
template<class M>
inline
CrpComponentState CheckResultProducerComponent::ModifyState(const M& mod)
{
return ModifyState([](auto) { return true; }, mod);
}

View File

@ -2,7 +2,11 @@
#pragma once
#include "base/atomic.hpp"
#include "base/object.hpp"
#include <condition_variable>
#include <cstdint>
#include <mutex>
namespace icinga
{
@ -31,4 +35,32 @@ public:
virtual void unlock_shared() noexcept = 0;
};
struct CrpComponentState
{
uint32_t InstanceIsActive = 0;
uint32_t ProcessingCheckResults = 0;
};
class CheckResultProducerComponent : public CheckResultProducer
{
public:
bool try_lock_shared() noexcept override;
void unlock_shared() noexcept override;
protected:
void Start();
void Stop();
private:
Atomic<CrpComponentState> m_State {CrpComponentState{}};
std::mutex m_Mutex;
std::condition_variable m_CV;
template<class C, class M>
CrpComponentState ModifyState(const C& cond, const M& mod);
template<class M>
CrpComponentState ModifyState(const M& mod);
};
}