1
0
mirror of https://github.com/Icinga/icinga2.git synced 2025-04-08 17:05:25 +02:00

Merge pull request from Icinga/feature/icingadb-redis-lost-history-memory

Icinga DB: keep history in memory until written to Redis
This commit is contained in:
Julian Brost 2022-03-02 19:46:01 +01:00 committed by GitHub
commit 23a4640ccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 292 additions and 23 deletions

@ -19,6 +19,7 @@ set(base_SOURCES
atomic.hpp
base64.cpp base64.hpp
boolean.cpp boolean.hpp boolean-script.cpp
bulker.hpp
configobject.cpp configobject.hpp configobject-ti.hpp configobject-script.cpp
configtype.cpp configtype.hpp
configuration.cpp configuration.hpp configuration-ti.hpp

119
lib/base/bulker.hpp Normal file

@ -0,0 +1,119 @@
/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
#ifndef BULKER_H
#define BULKER_H
#include <boost/config.hpp>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
namespace icinga
{
/**
* A queue which outputs the input as bulks of a defined size
* or after a defined time, whichever is reached first
*
* @ingroup base
*/
template<class T>
class Bulker
{
private:
typedef std::chrono::steady_clock Clock;
public:
typedef std::vector<T> Container;
typedef typename Container::size_type SizeType;
typedef typename Clock::duration Duration;
Bulker(SizeType bulkSize, Duration threshold)
: m_BulkSize(bulkSize), m_Threshold(threshold), m_NextConsumption(NullTimePoint()) { }
void ProduceOne(T needle);
Container ConsumeMany();
SizeType Size();
inline SizeType GetBulkSize() const noexcept
{
return m_BulkSize;
}
private:
typedef std::chrono::time_point<Clock> TimePoint;
static inline
TimePoint NullTimePoint()
{
return TimePoint::min();
}
inline void UpdateNextConsumption()
{
m_NextConsumption = Clock::now() + m_Threshold;
}
const SizeType m_BulkSize;
const Duration m_Threshold;
std::mutex m_Mutex;
std::condition_variable m_CV;
std::queue<Container> m_Bulks;
TimePoint m_NextConsumption;
};
template<class T>
void Bulker<T>::ProduceOne(T needle)
{
std::unique_lock<std::mutex> lock (m_Mutex);
if (m_Bulks.empty() || m_Bulks.back().size() == m_BulkSize) {
m_Bulks.emplace();
}
m_Bulks.back().emplace_back(std::move(needle));
if (m_Bulks.size() == 1u && m_Bulks.back().size() == m_BulkSize) {
m_CV.notify_one();
}
}
template<class T>
typename Bulker<T>::Container Bulker<T>::ConsumeMany()
{
std::unique_lock<std::mutex> lock (m_Mutex);
if (BOOST_UNLIKELY(m_NextConsumption == NullTimePoint())) {
UpdateNextConsumption();
}
auto deadline (m_NextConsumption);
m_CV.wait_until(lock, deadline, [this]() { return !m_Bulks.empty() && m_Bulks.front().size() == m_BulkSize; });
UpdateNextConsumption();
if (m_Bulks.empty()) {
return Container();
}
auto haystack (std::move(m_Bulks.front()));
m_Bulks.pop();
return haystack;
}
template<class T>
typename Bulker<T>::SizeType Bulker<T>::Size()
{
std::unique_lock<std::mutex> lock (m_Mutex);
return m_Bulks.empty() ? 0 : (m_Bulks.size() - 1u) * m_BulkSize + m_Bulks.back().size();
}
}
#endif /* BULKER_H */

@ -25,7 +25,7 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
ioEngine.m_AlreadyExpiredTimer.async_wait(yc);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}
@ -64,7 +64,7 @@ IoBoundWorkSlot::~IoBoundWorkSlot()
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
ioEngine.m_AlreadyExpiredTimer.async_wait(yc);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}

@ -119,6 +119,12 @@ public:
);
}
static inline
void YieldCurrentCoroutine(boost::asio::yield_context yc)
{
Get().m_AlreadyExpiredTimer.async_wait(yc);
}
private:
IoEngine();

@ -30,6 +30,8 @@
#include "icinga/timeperiod.hpp"
#include "icinga/pluginutility.hpp"
#include "remote/zone.hpp"
#include <chrono>
#include <cstdint>
#include <iterator>
#include <map>
#include <memory>
@ -1623,8 +1625,9 @@ unsigned short GetPreviousState(const Checkable::Ptr& checkable, const Service::
void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (!checkable)
@ -1707,7 +1710,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
xAdd.emplace_back(GetObjectIdentifier(endpoint));
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendSentNotification(
@ -1715,8 +1718,9 @@ void IcingaDB::SendSentNotification(
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime
)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
Host::Ptr host;
Service::Ptr service;
@ -1779,13 +1783,14 @@ void IcingaDB::SendSentNotification(
xAdd.emplace_back(JsonEncode(users_notified));
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
SendConfigUpdate(downtime, true);
@ -1865,13 +1870,14 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(scheduledBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
auto checkable (downtime->GetCheckable());
auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy()));
@ -1955,12 +1961,12 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(scheduledBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
{
if (!m_Rcon || !m_Rcon->IsConnected() || comment->GetEntryType() != CommentUser)
if (comment->GetEntryType() != CommentUser || !GetActive())
return;
auto checkable (comment->GetCheckable());
@ -2010,14 +2016,15 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
}
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
auto checkable (comment->GetCheckable());
@ -2079,14 +2086,15 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
}
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
Host::Ptr host;
Service::Ptr service;
@ -2144,7 +2152,7 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change
xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), startTime})));
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
@ -2176,8 +2184,9 @@ void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
Host::Ptr host;
Service::Ptr service;
@ -2228,13 +2237,14 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime})));
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange)
{
if (!m_Rcon || !m_Rcon->IsConnected())
if (!GetActive()) {
return;
}
Host::Ptr host;
Service::Ptr service;
@ -2282,7 +2292,79 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
xAdd.emplace_back(removedBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::ForwardHistoryEntries()
{
using clock = std::chrono::steady_clock;
const std::chrono::seconds logInterval (10);
auto nextLog (clock::now() + logInterval);
auto logPeriodically ([this, logInterval, &nextLog]() {
if (clock::now() > nextLog) {
nextLog += logInterval;
auto size (m_HistoryBulker.Size());
Log(size > m_HistoryBulker.GetBulkSize() ? LogInformation : LogNotice, "IcingaDB")
<< "Pending history queries: " << size;
}
});
for (;;) {
logPeriodically();
auto haystack (m_HistoryBulker.ConsumeMany());
if (haystack.empty()) {
if (!GetActive()) {
break;
}
continue;
}
uintmax_t attempts = 0;
auto logFailure ([&haystack, &attempts](const char* err = nullptr) {
Log msg (LogNotice, "IcingaDB");
msg << "history: " << haystack.size() << " queries failed temporarily (attempt #" << ++attempts << ")";
if (err) {
msg << ": " << err;
}
});
for (;;) {
logPeriodically();
if (m_Rcon && m_Rcon->IsConnected()) {
try {
m_Rcon->GetResultsOfQueries(haystack, Prio::History);
break;
} catch (const std::exception& ex) {
logFailure(ex.what());
} catch (...) {
logFailure();
}
} else {
logFailure("not connected to Redis");
}
if (!GetActive()) {
Log(LogCritical, "IcingaDB") << "history: " << haystack.size() << " queries failed (attempt #" << attempts
<< ") while we're about to shut down. Giving up and discarding additional "
<< m_HistoryBulker.Size() << " queued history queries.";
return;
}
Utility::Sleep(2);
}
}
}
void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) {

@ -7,6 +7,8 @@
#include "remote/eventqueue.hpp"
#include "base/configuration.hpp"
#include "base/json.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
#include "base/tlsutility.hpp"
#include "base/utility.hpp"
#include "icinga/checkable.hpp"
@ -27,6 +29,8 @@ std::once_flag IcingaDB::m_EnvironmentIdOnce;
REGISTER_TYPE(IcingaDB);
REGISTER_STATSFUNCTION(IcingaDB, &IcingaDB::StatsFunc);
IcingaDB::IcingaDB()
: m_Rcon(nullptr)
{
@ -38,6 +42,28 @@ IcingaDB::IcingaDB()
m_PrefixConfigCheckSum = "icinga:checksum:";
}
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
*/
void IcingaDB::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
for (auto& icingadb : ConfigType::GetObjectsByType<IcingaDB>()) {
auto historyBufferItems (icingadb->m_HistoryBulker.Size());
nodes.emplace_back(icingadb->GetName(), new Dictionary({
{ "history_buffer_items", historyBufferItems }
}));
perfdata->Add(new PerfdataValue("icingadb_" + icingadb->GetName() + "_history_buffer_items", historyBufferItems));
}
status->Set("icingadb", new Dictionary(std::move(nodes)));
}
void IcingaDB::Validate(int types, const ValidationUtils& utils)
{
ObjectImpl<IcingaDB>::Validate(types, utils);
@ -145,6 +171,10 @@ void IcingaDB::Start(bool runtimeCreated)
m_Rcon->SuppressQueryKind(Prio::CheckResult);
m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync);
Ptr keepAlive (this);
m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); });
}
void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
@ -203,6 +233,15 @@ void IcingaDB::PublishStats()
void IcingaDB::Stop(bool runtimeRemoved)
{
Log(LogInformation, "IcingaDB")
<< "Flushing history data buffer to Redis.";
if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) {
Log(LogCritical, "IcingaDB")
<< "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding "
<< m_HistoryBulker.Size() << " queued history queries.";
}
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' stopped.";

@ -5,6 +5,7 @@
#include "icingadb/icingadb-ti.hpp"
#include "icingadb/redisconnection.hpp"
#include "base/bulker.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include "icinga/customvarobject.hpp"
@ -13,6 +14,8 @@
#include "icinga/downtime.hpp"
#include "remote/messageorigin.hpp"
#include <atomic>
#include <chrono>
#include <future>
#include <memory>
#include <mutex>
#include <set>
@ -34,6 +37,7 @@ public:
IcingaDB();
static void ConfigStaticInitialize();
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
void Validate(int types, const ValidationUtils& utils) override;
virtual void Start(bool runtimeCreated) override;
@ -111,6 +115,8 @@ private:
void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void ForwardHistoryEntries();
std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable);
@ -176,6 +182,9 @@ private:
Timer::Ptr m_StatsTimer;
WorkQueue m_WorkQueue{0, 1, LogNotice};
std::future<void> m_HistoryThread;
Bulker<RedisConnection::Query> m_HistoryBulker {4096, std::chrono::milliseconds(250)};
String m_PrefixConfigObject;
String m_PrefixConfigCheckSum;

@ -262,6 +262,12 @@ void RedisConnection::Connect(asio::yield_context& yc)
boost::asio::deadline_timer timer (m_Strand.context());
auto waitForReadLoop ([this, &yc]() {
while (!m_Queues.FutureResponseActions.empty()) {
IoEngine::YieldCurrentCoroutine(yc);
}
});
for (;;) {
try {
if (m_Path.IsEmpty()) {
@ -294,6 +300,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
}
Handshake(conn, yc);
waitForReadLoop();
m_TlsConn = std::move(conn);
} else {
Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
@ -305,6 +312,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc);
waitForReadLoop();
m_TcpConn = std::move(conn);
}
} else {
@ -317,6 +325,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc);
waitForReadLoop();
m_UnixConn = std::move(conn);
}
@ -413,12 +422,16 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
throw;
} catch (...) {
promise.set_exception(std::current_exception());
continue;
break;
}
}
promise.set_value(std::move(replies));
try {
promise.set_value(std::move(replies));
} catch (const std::future_error&) {
// Complaint about the above op is not allowed
// due to promise.set_exception() was already called
}
}
}
}