diff --git a/lib/base/bulker.hpp b/lib/base/bulker.hpp index d9deaf661..711aae04f 100644 --- a/lib/base/bulker.hpp +++ b/lib/base/bulker.hpp @@ -36,6 +36,7 @@ public: void ProduceOne(T needle); Container ConsumeMany(); + SizeType Size(); private: typedef std::chrono::time_point TimePoint; @@ -100,6 +101,14 @@ typename Bulker::Container Bulker::ConsumeMany() return haystack; } +template +typename Bulker::SizeType Bulker::Size() +{ + std::unique_lock lock (m_Mutex); + + return m_Bulks.empty() ? 0 : (m_Bulks.size() - 1u) * m_BulkSize + m_Bulks.back().size(); +} + } #endif /* BULKER_H */ diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index e075db183..5f2117e57 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -30,6 +30,7 @@ #include "icinga/timeperiod.hpp" #include "icinga/pluginutility.hpp" #include "remote/zone.hpp" +#include #include #include #include @@ -1623,9 +1624,6 @@ unsigned short GetPreviousState(const Checkable::Ptr& checkable, const Service:: void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - Checkable::Ptr checkable = dynamic_pointer_cast(object); if (!checkable) return; @@ -1707,7 +1705,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendSentNotification( @@ -1715,9 +1713,6 @@ void IcingaDB::SendSentNotification( NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime ) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); @@ -1779,14 +1774,11 @@ void IcingaDB::SendSentNotification( xAdd.emplace_back(JsonEncode(users_notified)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - SendConfigUpdate(downtime, true); auto checkable (downtime->GetCheckable()); @@ -1865,14 +1857,11 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(scheduledBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - auto checkable (downtime->GetCheckable()); auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); @@ -1955,12 +1944,12 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(scheduledBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendAddedComment(const Comment::Ptr& comment) { - if (!m_Rcon || !m_Rcon->IsConnected() || comment->GetEntryType() != CommentUser) + if (comment->GetEntryType() != CommentUser) return; auto checkable (comment->GetCheckable()); @@ -2010,15 +1999,12 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - auto checkable (comment->GetCheckable()); Host::Ptr host; @@ -2079,15 +2065,12 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); @@ -2144,7 +2127,7 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change xAdd.emplace_back("id"); xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), startTime}))); - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) @@ -2176,9 +2159,6 @@ void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); @@ -2228,14 +2208,11 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str xAdd.emplace_back("id"); xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime}))); - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange) { - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); @@ -2282,7 +2259,59 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const xAdd.emplace_back(removedBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::ForwardHistoryEntries() +{ + for (;;) { + auto haystack (m_HistoryBulker.ConsumeMany()); + + if (haystack.empty()) { + if (!GetActive()) { + break; + } + + continue; + } + + uintmax_t attempts = 0; + + auto logFailure ([&haystack, &attempts](const char* err = nullptr) { + Log msg (LogNotice, "IcingaDB"); + + msg << "history: " << haystack.size() << " queries failed temporarily (attempt #" << ++attempts << ")"; + + if (err) { + msg << ": " << err; + } + }); + + for (;;) { + if (m_Rcon && m_Rcon->IsConnected()) { + try { + m_Rcon->GetResultsOfQueries(haystack, Prio::History); + break; + } catch (const std::exception& ex) { + logFailure(ex.what()); + } catch (...) { + logFailure(); + } + } else { + logFailure("not connected to Redis"); + } + + if (!GetActive()) { + Log(LogCritical, "IcingaDB") << "history: " << haystack.size() << " queries failed (attempt #" << attempts + << ") while we're about to shut down. Giving up and discarding additional " + << m_HistoryBulker.Size() << " queued history queries."; + + return; + } + + Utility::Sleep(2); + } + } } void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index d5f725b6f..0e3ae9830 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -145,6 +145,8 @@ void IcingaDB::Start(bool runtimeCreated) m_Rcon->SuppressQueryKind(Prio::CheckResult); m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); + + m_HistoryThread = std::thread([this]() { ForwardHistoryEntries(); }); } void IcingaDB::ExceptionHandler(boost::exception_ptr exp) @@ -203,6 +205,11 @@ void IcingaDB::PublishStats() void IcingaDB::Stop(bool runtimeRemoved) { + Log(LogInformation, "IcingaDB") + << "Flushing history data buffer to Redis."; + + m_HistoryThread.join(); + Log(LogInformation, "IcingaDB") << "'" << GetName() << "' stopped."; diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 91e0bb45e..62570cae1 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -5,6 +5,7 @@ #include "icingadb/icingadb-ti.hpp" #include "icingadb/redisconnection.hpp" +#include "base/bulker.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" #include "icinga/customvarobject.hpp" @@ -13,9 +14,11 @@ #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" #include +#include #include #include #include +#include #include #include @@ -111,6 +114,8 @@ private: void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void ForwardHistoryEntries(); + std::vector UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable); @@ -176,6 +181,9 @@ private: Timer::Ptr m_StatsTimer; WorkQueue m_WorkQueue{0, 1, LogNotice}; + std::thread m_HistoryThread; + Bulker m_HistoryBulker {4096, std::chrono::milliseconds(250)}; + String m_PrefixConfigObject; String m_PrefixConfigCheckSum;