From 4005d81a435829cdf5cfdd7cafa69ea12d4febc2 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Thu, 22 Apr 2021 09:15:55 +0200 Subject: [PATCH] Icinga DB: Sync state using runtime updates --- lib/icingadb/icingadb-objects.cpp | 78 ++++++++++++++++++++++--------- lib/icingadb/icingadb-utility.cpp | 2 +- lib/icingadb/icingadb.cpp | 1 - lib/icingadb/icingadb.hpp | 1 - 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 294125a33..6a1c0690d 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -218,9 +218,11 @@ void IcingaDB::UpdateAllConfigObjects() upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { std::map> hMSets; - std::vector states = {"HMSET", m_PrefixStateObject + lcType}; - std::vector runtimeUpdates; - std::vector > transaction = {{"MULTI"}}; + // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded) + std::vector states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + // Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded) + std::vector statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; + std::vector > transaction = {{"MULTI"}}; std::vector hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; auto skimObjects ([&]() { @@ -243,12 +245,19 @@ void IcingaDB::UpdateAllConfigObjects() if (lcType != GetLowerCaseTypeNameDB(object)) continue; + std::vector runtimeUpdates; CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false); // Write out inital state for checkables if (dumpState) { - states.emplace_back(GetObjectIdentifier(object)); - states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast(object)))); + String objectKey = GetObjectIdentifier(object); + Dictionary::Ptr state = SerializeState(dynamic_pointer_cast(object)); + + states.emplace_back(objectKey); + states.emplace_back(JsonEncode(state)); + + statesChksms.emplace_back(objectKey); + statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}}))); } bulkCounter++; @@ -264,7 +273,9 @@ void IcingaDB::UpdateAllConfigObjects() if (states.size() > 2) { transaction.emplace_back(std::move(states)); - states = {"HMSET", m_PrefixStateObject + lcType}; + transaction.emplace_back(std::move(statesChksms)); + states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; } hMSets = decltype(hMSets)(); @@ -524,7 +535,7 @@ std::vector IcingaDB::GetTypeOverwriteKeys(const String& type) if (type == "host" || type == "service" || type == "user") { keys.emplace_back(m_PrefixConfigObject + type + ":groupmember"); - keys.emplace_back(m_PrefixStateObject + type); + keys.emplace_back(m_PrefixConfigObject + type + ":state"); } else if (type == "timeperiod") { keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); @@ -556,7 +567,7 @@ std::vector IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type) if (type == Host::TypeInstance || type == Service::TypeInstance) { keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); - keys.emplace_back(m_PrefixStateObject + lcType); + keys.emplace_back(m_PrefixConfigObject + lcType + ":state"); } else if (type == User::TypeInstance) { keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); } else if (type == TimePeriod::TypeInstance) { @@ -1049,9 +1060,14 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable) if (!m_Rcon || !m_Rcon->IsConnected()) return; + String objectType = GetLowerCaseTypeNameDB(checkable); + String objectKey = GetObjectIdentifier(checkable); + Dictionary::Ptr stateAttrs = SerializeState(checkable); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::State); + } // Used to update a single object, used for runtime updates @@ -1063,15 +1079,22 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd String typeName = GetLowerCaseTypeNameDB(object); std::map> hMSets; - std::vector states = {"HMSET", m_PrefixStateObject + typeName}; std::vector runtimeUpdates; CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); Checkable::Ptr checkable = dynamic_pointer_cast(object); if (checkable) { String objectKey = GetObjectIdentifier(object); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State); - publishes["icinga:config:update:state:" + typeName].emplace_back(objectKey); + Dictionary::Ptr state = SerializeState(checkable); + String checksum = HashValue(state); + + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::State); + + if (runtimeUpdate) { + state->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName + ":state", state); + } } std::vector > transaction = {{"MULTI"}}; @@ -1443,14 +1466,14 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) auto checkable (dynamic_pointer_cast(object)); if (checkable) { - m_Rcon->FireAndForgetQuery( + m_Rcon->FireAndForgetQueries({ { "ZREM", dynamic_pointer_cast(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", GetObjectIdentifier(checkable) }, - Prio::CheckResult - ); + {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey}, + }, Prio::CheckResult); } } @@ -1480,19 +1503,22 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu tie(host, service) = GetHostService(checkable); - String streamname; + String redisKey; if (service) - streamname = "icinga:state:stream:service"; + redisKey = "icinga:service:state"; else - streamname = "icinga:state:stream:host"; + redisKey = "icinga:host:state"; Dictionary::Ptr objectAttrs = SerializeState(checkable); + objectAttrs->Set("redis_key", redisKey); + objectAttrs->Set("runtime_type", "upsert"); + objectAttrs->Set("checksum", HashValue(objectAttrs)); - std::vector streamadd({"XADD", streamname, "*"}); + std::vector streamadd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); ObjectLock olock(objectAttrs); for (const Dictionary::Pair& kv : objectAttrs) { streamadd.emplace_back(kv.first); - streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); + streamadd.emplace_back(IcingaToStreamValue(kv.second)); } m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); @@ -2084,7 +2110,17 @@ Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) tie(host, service) = GetHostService(checkable); - attrs->Set("id", GetObjectIdentifier(checkable));; + String id = GetObjectIdentifier(checkable); + + /* + * As there is a 1:1 relationship between host and host state, the host ID ('object_id') + * is also used as the host state ID ('id'). These are duplicated to 1) avoid having + * special handling for this in Icinga DB and 2) to have both a primary key and a foreign key + * in the SQL database in the end. In the database 'object_id' ends up as foreign key 'host_state.host_id' + * referring to 'host.id' while 'id' ends up as the primary key 'host_state.id'. This also applies for service. + */ + attrs->Set("id", id); + attrs->Set("object_id", id); attrs->Set("environment_id", m_EnvironmentId); attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard); diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp index 0210d5bd1..8ca9e298e 100644 --- a/lib/icingadb/icingadb-utility.cpp +++ b/lib/icingadb/icingadb-utility.cpp @@ -217,7 +217,7 @@ String IcingaDB::IcingaToStreamValue(const Value& value) { switch (value.GetType()) { case ValueBoolean: - return Convert::ToString((unsigned short)value); + return Convert::ToString(int(value)); case ValueString: return Utility::ValidateUTF8(value); case ValueNumber: diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 739870402..ad97ad153 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -50,7 +50,6 @@ IcingaDB::IcingaDB() m_PrefixConfigObject = "icinga:"; m_PrefixConfigCheckSum = "icinga:checksum:"; - m_PrefixStateObject = "icinga:config:state:"; } /** diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 52c0c2b77..2cdc30a16 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -155,7 +155,6 @@ private: String m_PrefixConfigObject; String m_PrefixConfigCheckSum; - String m_PrefixStateObject; bool m_ConfigDumpInProgress; bool m_ConfigDumpDone;