Icinga DB: keep history in memory until written to Redis

by putting the messages into a Bulker and retrying each chunk.
This commit is contained in:
Alexander A. Klimov 2022-01-31 12:38:02 +01:00
parent 9a8d388734
commit 8ea62f7fc7
4 changed files with 87 additions and 34 deletions

View File

@ -36,6 +36,7 @@ public:
void ProduceOne(T needle);
Container ConsumeMany();
SizeType Size();
private:
typedef std::chrono::time_point<Clock> TimePoint;
@ -100,6 +101,14 @@ typename Bulker<T>::Container Bulker<T>::ConsumeMany()
return haystack;
}
template<class T>
typename Bulker<T>::SizeType Bulker<T>::Size()
{
std::unique_lock<std::mutex> lock (m_Mutex);
return m_Bulks.empty() ? 0 : (m_Bulks.size() - 1u) * m_BulkSize + m_Bulks.back().size();
}
}
#endif /* BULKER_H */

View File

@ -30,6 +30,7 @@
#include "icinga/timeperiod.hpp"
#include "icinga/pluginutility.hpp"
#include "remote/zone.hpp"
#include <cstdint>
#include <iterator>
#include <map>
#include <memory>
@ -1623,9 +1624,6 @@ unsigned short GetPreviousState(const Checkable::Ptr& checkable, const Service::
void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (!checkable)
return;
@ -1707,7 +1705,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
xAdd.emplace_back(GetObjectIdentifier(endpoint));
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendSentNotification(
@ -1715,9 +1713,6 @@ void IcingaDB::SendSentNotification(
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime
)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -1779,14 +1774,11 @@ void IcingaDB::SendSentNotification(
xAdd.emplace_back(JsonEncode(users_notified));
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
SendConfigUpdate(downtime, true);
auto checkable (downtime->GetCheckable());
@ -1865,14 +1857,11 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(scheduledBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
auto checkable (downtime->GetCheckable());
auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy()));
@ -1955,12 +1944,12 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(scheduledBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
{
if (!m_Rcon || !m_Rcon->IsConnected() || comment->GetEntryType() != CommentUser)
if (comment->GetEntryType() != CommentUser)
return;
auto checkable (comment->GetCheckable());
@ -2010,15 +1999,12 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
}
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
auto checkable (comment->GetCheckable());
Host::Ptr host;
@ -2079,15 +2065,12 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
}
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -2144,7 +2127,7 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change
xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), startTime})));
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
@ -2176,9 +2159,6 @@ void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -2228,14 +2208,11 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime})));
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -2282,7 +2259,59 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
xAdd.emplace_back(removedBy);
}
m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::ForwardHistoryEntries()
{
for (;;) {
auto haystack (m_HistoryBulker.ConsumeMany());
if (haystack.empty()) {
if (!GetActive()) {
break;
}
continue;
}
uintmax_t attempts = 0;
auto logFailure ([&haystack, &attempts](const char* err = nullptr) {
Log msg (LogNotice, "IcingaDB");
msg << "history: " << haystack.size() << " queries failed temporarily (attempt #" << ++attempts << ")";
if (err) {
msg << ": " << err;
}
});
for (;;) {
if (m_Rcon && m_Rcon->IsConnected()) {
try {
m_Rcon->GetResultsOfQueries(haystack, Prio::History);
break;
} catch (const std::exception& ex) {
logFailure(ex.what());
} catch (...) {
logFailure();
}
} else {
logFailure("not connected to Redis");
}
if (!GetActive()) {
Log(LogCritical, "IcingaDB") << "history: " << haystack.size() << " queries failed (attempt #" << attempts
<< ") while we're about to shut down. Giving up and discarding additional "
<< m_HistoryBulker.Size() << " queued history queries.";
return;
}
Utility::Sleep(2);
}
}
}
void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) {

View File

@ -145,6 +145,8 @@ void IcingaDB::Start(bool runtimeCreated)
m_Rcon->SuppressQueryKind(Prio::CheckResult);
m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync);
m_HistoryThread = std::thread([this]() { ForwardHistoryEntries(); });
}
void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
@ -203,6 +205,11 @@ void IcingaDB::PublishStats()
void IcingaDB::Stop(bool runtimeRemoved)
{
Log(LogInformation, "IcingaDB")
<< "Flushing history data buffer to Redis.";
m_HistoryThread.join();
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' stopped.";

View File

@ -5,6 +5,7 @@
#include "icingadb/icingadb-ti.hpp"
#include "icingadb/redisconnection.hpp"
#include "base/bulker.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include "icinga/customvarobject.hpp"
@ -13,9 +14,11 @@
#include "icinga/downtime.hpp"
#include "remote/messageorigin.hpp"
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <set>
#include <thread>
#include <unordered_map>
#include <utility>
@ -111,6 +114,8 @@ private:
void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void ForwardHistoryEntries();
std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable);
@ -176,6 +181,9 @@ private:
Timer::Ptr m_StatsTimer;
WorkQueue m_WorkQueue{0, 1, LogNotice};
std::thread m_HistoryThread;
Bulker<RedisConnection::Query> m_HistoryBulker {4096, std::chrono::milliseconds(250)};
String m_PrefixConfigObject;
String m_PrefixConfigCheckSum;