From 8a30657ce95964ea65fce55359fe08071e2d446d Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 21 Jul 2021 14:24:28 +0200 Subject: [PATCH 1/2] Icinga DB: write state updates to icinga:runtime:state ... allowing the Go daemon to priorize state updates. --- lib/icingadb/icingadb-objects.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index f9c2623c2..91e534a1b 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -1532,7 +1532,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu objectAttrs->Set("runtime_type", "upsert"); objectAttrs->Set("checksum", HashValue(objectAttrs)); - std::vector streamadd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); + std::vector streamadd({"XADD", "icinga:runtime:state", "MAXLEN", "~", "1000000", "*"}); ObjectLock olock(objectAttrs); for (const Dictionary::Pair& kv : objectAttrs) { streamadd.emplace_back(kv.first); From ae9b3711287ad97efa57e8a2916c874aeb219b75 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 26 Jul 2021 16:39:18 +0200 Subject: [PATCH 2/2] Icinga DB: priorize state > config I.e. do the following in parallel (highest priority first): * Stream state changes to icinga:runtime:state * Sync config and initial state, then let queued runtime updates to the just synced state pass --- lib/icingadb/icingadb-objects.cpp | 14 +++++++------- lib/icingadb/icingadb.cpp | 2 +- lib/icingadb/redisconnection.hpp | 5 +++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 91e534a1b..2fe33d05a 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -136,10 +136,10 @@ void IcingaDB::UpdateAllConfigObjects() std::vector types = GetTypes(); m_Rcon->SuppressQueryKind(Prio::CheckResult); - m_Rcon->SuppressQueryKind(Prio::State); + m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); Defer unSuppress ([this]() { - m_Rcon->UnsuppressQueryKind(Prio::State); + m_Rcon->UnsuppressQueryKind(Prio::RuntimeStateSync); m_Rcon->UnsuppressQueryKind(Prio::CheckResult); }); @@ -1078,8 +1078,8 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable) Dictionary::Ptr stateAttrs = SerializeState(checkable); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::State); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::RuntimeStateSync); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::RuntimeStateSync); } @@ -1101,8 +1101,8 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd Dictionary::Ptr state = SerializeState(checkable); String checksum = HashValue(state); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::State); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::RuntimeStateSync); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::RuntimeStateSync); if (runtimeUpdate) { state->Set("checksum", checksum); @@ -1539,7 +1539,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu streamadd.emplace_back(IcingaToStreamValue(kv.second)); } - m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); + m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream); int hard_state; if (!cr) { diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 7b05364d1..185796c8b 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -76,7 +76,7 @@ void IcingaDB::Start(bool runtimeCreated) m_WorkQueue.SetName("IcingaDB"); m_Rcon->SuppressQueryKind(Prio::CheckResult); - m_Rcon->SuppressQueryKind(Prio::State); + m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); } void IcingaDB::ExceptionHandler(boost::exception_ptr exp) diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 9c7aaa880..c3a484e82 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -62,8 +62,9 @@ namespace icinga enum class QueryPriority : unsigned char { Heartbeat, - Config, - State, + RuntimeStateStream, // runtime state updates, doesn't affect initially synced states + Config, // includes initially synced states + RuntimeStateSync, // updates initially synced states at runtime, in parallel to config dump, therefore must be < Config History, CheckResult, SyncConnection = 255