diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 4ffadb7a0..f99201822 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -19,6 +19,7 @@ set(base_SOURCES atomic.hpp base64.cpp base64.hpp boolean.cpp boolean.hpp boolean-script.cpp + bulker.hpp configobject.cpp configobject.hpp configobject-ti.hpp configobject-script.cpp configtype.cpp configtype.hpp configuration.cpp configuration.hpp configuration-ti.hpp diff --git a/lib/base/bulker.hpp b/lib/base/bulker.hpp new file mode 100644 index 000000000..2c30dc38b --- /dev/null +++ b/lib/base/bulker.hpp @@ -0,0 +1,119 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +#ifndef BULKER_H +#define BULKER_H + +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +/** + * A queue which outputs the input as bulks of a defined size + * or after a defined time, whichever is reached first + * + * @ingroup base + */ +template +class Bulker +{ +private: + typedef std::chrono::steady_clock Clock; + +public: + typedef std::vector Container; + typedef typename Container::size_type SizeType; + typedef typename Clock::duration Duration; + + Bulker(SizeType bulkSize, Duration threshold) + : m_BulkSize(bulkSize), m_Threshold(threshold), m_NextConsumption(NullTimePoint()) { } + + void ProduceOne(T needle); + Container ConsumeMany(); + SizeType Size(); + + inline SizeType GetBulkSize() const noexcept + { + return m_BulkSize; + } + +private: + typedef std::chrono::time_point TimePoint; + + static inline + TimePoint NullTimePoint() + { + return TimePoint::min(); + } + + inline void UpdateNextConsumption() + { + m_NextConsumption = Clock::now() + m_Threshold; + } + + const SizeType m_BulkSize; + const Duration m_Threshold; + + std::mutex m_Mutex; + std::condition_variable m_CV; + std::queue m_Bulks; + TimePoint m_NextConsumption; +}; + +template +void Bulker::ProduceOne(T needle) +{ + std::unique_lock lock (m_Mutex); + + if (m_Bulks.empty() || m_Bulks.back().size() == m_BulkSize) { + m_Bulks.emplace(); + } + + m_Bulks.back().emplace_back(std::move(needle)); + + if (m_Bulks.size() == 1u && m_Bulks.back().size() == m_BulkSize) { + m_CV.notify_one(); + } +} + +template +typename Bulker::Container Bulker::ConsumeMany() +{ + std::unique_lock lock (m_Mutex); + + if (BOOST_UNLIKELY(m_NextConsumption == NullTimePoint())) { + UpdateNextConsumption(); + } + + auto deadline (m_NextConsumption); + + m_CV.wait_until(lock, deadline, [this]() { return !m_Bulks.empty() && m_Bulks.front().size() == m_BulkSize; }); + UpdateNextConsumption(); + + if (m_Bulks.empty()) { + return Container(); + } + + auto haystack (std::move(m_Bulks.front())); + + m_Bulks.pop(); + return haystack; +} + +template +typename Bulker::SizeType Bulker::Size() +{ + std::unique_lock lock (m_Mutex); + + return m_Bulks.empty() ? 0 : (m_Bulks.size() - 1u) * m_BulkSize + m_Bulks.back().size(); +} + +} + +#endif /* BULKER_H */ diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index d3197790e..c00bd7819 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -25,7 +25,7 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) if (availableSlots < 1) { ioEngine.m_CpuBoundSemaphore.fetch_add(1); - ioEngine.m_AlreadyExpiredTimer.async_wait(yc); + IoEngine::YieldCurrentCoroutine(yc); continue; } @@ -64,7 +64,7 @@ IoBoundWorkSlot::~IoBoundWorkSlot() if (availableSlots < 1) { ioEngine.m_CpuBoundSemaphore.fetch_add(1); - ioEngine.m_AlreadyExpiredTimer.async_wait(yc); + IoEngine::YieldCurrentCoroutine(yc); continue; } diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index dabd6730b..684d3ac39 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -119,6 +119,12 @@ public: ); } + static inline + void YieldCurrentCoroutine(boost::asio::yield_context yc) + { + Get().m_AlreadyExpiredTimer.async_wait(yc); + } + private: IoEngine(); diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index e075db183..e2272a63d 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -30,6 +30,8 @@ #include "icinga/timeperiod.hpp" #include "icinga/pluginutility.hpp" #include "remote/zone.hpp" +#include +#include #include #include #include @@ -1623,8 +1625,9 @@ unsigned short GetPreviousState(const Checkable::Ptr& checkable, const Service:: void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } Checkable::Ptr checkable = dynamic_pointer_cast(object); if (!checkable) @@ -1707,7 +1710,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendSentNotification( @@ -1715,8 +1718,9 @@ void IcingaDB::SendSentNotification( NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime ) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } Host::Ptr host; Service::Ptr service; @@ -1779,13 +1783,14 @@ void IcingaDB::SendSentNotification( xAdd.emplace_back(JsonEncode(users_notified)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } SendConfigUpdate(downtime, true); @@ -1865,13 +1870,14 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(scheduledBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } auto checkable (downtime->GetCheckable()); auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); @@ -1955,12 +1961,12 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(scheduledBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendAddedComment(const Comment::Ptr& comment) { - if (!m_Rcon || !m_Rcon->IsConnected() || comment->GetEntryType() != CommentUser) + if (comment->GetEntryType() != CommentUser || !GetActive()) return; auto checkable (comment->GetCheckable()); @@ -2010,14 +2016,15 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } auto checkable (comment->GetCheckable()); @@ -2079,14 +2086,15 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } Host::Ptr host; Service::Ptr service; @@ -2144,7 +2152,7 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change xAdd.emplace_back("id"); xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), startTime}))); - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) @@ -2176,8 +2184,9 @@ void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } Host::Ptr host; Service::Ptr service; @@ -2228,13 +2237,14 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str xAdd.emplace_back("id"); xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime}))); - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); } void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!GetActive()) { return; + } Host::Ptr host; Service::Ptr service; @@ -2282,7 +2292,79 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const xAdd.emplace_back(removedBy); } - m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::ForwardHistoryEntries() +{ + using clock = std::chrono::steady_clock; + + const std::chrono::seconds logInterval (10); + auto nextLog (clock::now() + logInterval); + + auto logPeriodically ([this, logInterval, &nextLog]() { + if (clock::now() > nextLog) { + nextLog += logInterval; + + auto size (m_HistoryBulker.Size()); + + Log(size > m_HistoryBulker.GetBulkSize() ? LogInformation : LogNotice, "IcingaDB") + << "Pending history queries: " << size; + } + }); + + for (;;) { + logPeriodically(); + + auto haystack (m_HistoryBulker.ConsumeMany()); + + if (haystack.empty()) { + if (!GetActive()) { + break; + } + + continue; + } + + uintmax_t attempts = 0; + + auto logFailure ([&haystack, &attempts](const char* err = nullptr) { + Log msg (LogNotice, "IcingaDB"); + + msg << "history: " << haystack.size() << " queries failed temporarily (attempt #" << ++attempts << ")"; + + if (err) { + msg << ": " << err; + } + }); + + for (;;) { + logPeriodically(); + + if (m_Rcon && m_Rcon->IsConnected()) { + try { + m_Rcon->GetResultsOfQueries(haystack, Prio::History); + break; + } catch (const std::exception& ex) { + logFailure(ex.what()); + } catch (...) { + logFailure(); + } + } else { + logFailure("not connected to Redis"); + } + + if (!GetActive()) { + Log(LogCritical, "IcingaDB") << "history: " << haystack.size() << " queries failed (attempt #" << attempts + << ") while we're about to shut down. Giving up and discarding additional " + << m_HistoryBulker.Size() << " queued history queries."; + + return; + } + + Utility::Sleep(2); + } + } } void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index d5f725b6f..1adcaee84 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -7,6 +7,8 @@ #include "remote/eventqueue.hpp" #include "base/configuration.hpp" #include "base/json.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" #include "base/tlsutility.hpp" #include "base/utility.hpp" #include "icinga/checkable.hpp" @@ -27,6 +29,8 @@ std::once_flag IcingaDB::m_EnvironmentIdOnce; REGISTER_TYPE(IcingaDB); +REGISTER_STATSFUNCTION(IcingaDB, &IcingaDB::StatsFunc); + IcingaDB::IcingaDB() : m_Rcon(nullptr) { @@ -38,6 +42,28 @@ IcingaDB::IcingaDB() m_PrefixConfigCheckSum = "icinga:checksum:"; } +/** + * Feature stats interface + * + * @param status Key value pairs for feature stats + */ +void IcingaDB::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + + for (auto& icingadb : ConfigType::GetObjectsByType()) { + auto historyBufferItems (icingadb->m_HistoryBulker.Size()); + + nodes.emplace_back(icingadb->GetName(), new Dictionary({ + { "history_buffer_items", historyBufferItems } + })); + + perfdata->Add(new PerfdataValue("icingadb_" + icingadb->GetName() + "_history_buffer_items", historyBufferItems)); + } + + status->Set("icingadb", new Dictionary(std::move(nodes))); +} + void IcingaDB::Validate(int types, const ValidationUtils& utils) { ObjectImpl::Validate(types, utils); @@ -145,6 +171,10 @@ void IcingaDB::Start(bool runtimeCreated) m_Rcon->SuppressQueryKind(Prio::CheckResult); m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); + + Ptr keepAlive (this); + + m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); }); } void IcingaDB::ExceptionHandler(boost::exception_ptr exp) @@ -203,6 +233,15 @@ void IcingaDB::PublishStats() void IcingaDB::Stop(bool runtimeRemoved) { + Log(LogInformation, "IcingaDB") + << "Flushing history data buffer to Redis."; + + if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) { + Log(LogCritical, "IcingaDB") + << "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding " + << m_HistoryBulker.Size() << " queued history queries."; + } + Log(LogInformation, "IcingaDB") << "'" << GetName() << "' stopped."; diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 91e0bb45e..785cd01ba 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -5,6 +5,7 @@ #include "icingadb/icingadb-ti.hpp" #include "icingadb/redisconnection.hpp" +#include "base/bulker.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" #include "icinga/customvarobject.hpp" @@ -13,6 +14,8 @@ #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" #include +#include +#include #include #include #include @@ -34,6 +37,7 @@ public: IcingaDB(); static void ConfigStaticInitialize(); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); void Validate(int types, const ValidationUtils& utils) override; virtual void Start(bool runtimeCreated) override; @@ -111,6 +115,8 @@ private: void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void ForwardHistoryEntries(); + std::vector UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable); @@ -176,6 +182,9 @@ private: Timer::Ptr m_StatsTimer; WorkQueue m_WorkQueue{0, 1, LogNotice}; + std::future m_HistoryThread; + Bulker m_HistoryBulker {4096, std::chrono::milliseconds(250)}; + String m_PrefixConfigObject; String m_PrefixConfigCheckSum; diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index 7537f0271..f2ae3fa7b 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -262,6 +262,12 @@ void RedisConnection::Connect(asio::yield_context& yc) boost::asio::deadline_timer timer (m_Strand.context()); + auto waitForReadLoop ([this, &yc]() { + while (!m_Queues.FutureResponseActions.empty()) { + IoEngine::YieldCurrentCoroutine(yc); + } + }); + for (;;) { try { if (m_Path.IsEmpty()) { @@ -294,6 +300,7 @@ void RedisConnection::Connect(asio::yield_context& yc) } Handshake(conn, yc); + waitForReadLoop(); m_TlsConn = std::move(conn); } else { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") @@ -305,6 +312,7 @@ void RedisConnection::Connect(asio::yield_context& yc) icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); Handshake(conn, yc); + waitForReadLoop(); m_TcpConn = std::move(conn); } } else { @@ -317,6 +325,7 @@ void RedisConnection::Connect(asio::yield_context& yc) conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); Handshake(conn, yc); + waitForReadLoop(); m_UnixConn = std::move(conn); } @@ -413,12 +422,16 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) throw; } catch (...) { promise.set_exception(std::current_exception()); - - continue; + break; } } - promise.set_value(std::move(replies)); + try { + promise.set_value(std::move(replies)); + } catch (const std::future_error&) { + // Complaint about the above op is not allowed + // due to promise.set_exception() was already called + } } } }