diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 5793f9762..86cdd2744 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -217,9 +217,12 @@ void IcingaDB::UpdateAllConfigObjects() std::mutex ourContentMutex; upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { - std::map> hMSets, publishes; - std::vector states = {"HMSET", m_PrefixStateObject + lcType}; - std::vector > transaction = {{"MULTI"}}; + std::map> hMSets; + // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded) + std::vector states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + // Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded) + std::vector statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; + std::vector > transaction = {{"MULTI"}}; std::vector hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; auto skimObjects ([&]() { @@ -242,12 +245,19 @@ void IcingaDB::UpdateAllConfigObjects() if (lcType != GetLowerCaseTypeNameDB(object)) continue; - CreateConfigUpdate(object, lcType, hMSets, publishes, false); + std::vector runtimeUpdates; + CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false); // Write out inital state for checkables if (dumpState) { - states.emplace_back(GetObjectIdentifier(object)); - states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast(object)))); + String objectKey = GetObjectIdentifier(object); + Dictionary::Ptr state = SerializeState(dynamic_pointer_cast(object)); + + states.emplace_back(objectKey); + states.emplace_back(JsonEncode(state)); + + statesChksms.emplace_back(objectKey); + statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}}))); } bulkCounter++; @@ -263,24 +273,12 @@ void IcingaDB::UpdateAllConfigObjects() if (states.size() > 2) { transaction.emplace_back(std::move(states)); - states = {"HMSET", m_PrefixStateObject + lcType}; - } - - for (auto& kv : publishes) { - for (auto& message : kv.second) { - std::vector publish; - - publish.reserve(3); - publish.emplace_back("PUBLISH"); - publish.emplace_back(kv.first); - publish.emplace_back(std::move(message)); - - transaction.emplace_back(std::move(publish)); - } + transaction.emplace_back(std::move(statesChksms)); + states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; } hMSets = decltype(hMSets)(); - publishes = decltype(publishes)(); if (transaction.size() > 1) { transaction.push_back({"EXEC"}); @@ -319,19 +317,6 @@ void IcingaDB::UpdateAllConfigObjects() if (states.size() > 2) transaction.emplace_back(std::move(states)); - for (auto& kv : publishes) { - for (auto& message : kv.second) { - std::vector publish; - - publish.reserve(3); - publish.emplace_back("PUBLISH"); - publish.emplace_back(kv.first); - publish.emplace_back(std::move(message)); - - transaction.emplace_back(std::move(publish)); - } - } - if (transaction.size() > 1) { transaction.push_back({"EXEC"}); m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); @@ -549,8 +534,8 @@ std::vector IcingaDB::GetTypeOverwriteKeys(const String& type) }; if (type == "host" || type == "service" || type == "user") { - keys.emplace_back(m_PrefixConfigObject + type + ":groupmember"); - keys.emplace_back(m_PrefixStateObject + type); + keys.emplace_back(m_PrefixConfigObject + type + "group:member"); + keys.emplace_back(m_PrefixConfigObject + type + ":state"); } else if (type == "timeperiod") { keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); @@ -581,10 +566,10 @@ std::vector IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type) } if (type == Host::TypeInstance || type == Service::TypeInstance) { - keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); - keys.emplace_back(m_PrefixStateObject + lcType); + keys.emplace_back(m_PrefixConfigObject + lcType + "group:member"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":state"); } else if (type == User::TypeInstance) { - keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); + keys.emplace_back(m_PrefixConfigObject + lcType + "group:member"); } else if (type == TimePeriod::TypeInstance) { keys.emplace_back(m_PrefixConfigObject + lcType + ":override:include"); keys.emplace_back(m_PrefixConfigObject + lcType + ":override:exclude"); @@ -610,7 +595,7 @@ static ConfigObject::Ptr GetObjectByName(const String& name) } void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, - std::map>& publishes, bool runtimeUpdate) + std::vector& runtimeUpdates, bool runtimeUpdate) { String objectKey = GetObjectIdentifier(object); CustomVarObject::Ptr customVarObject = dynamic_pointer_cast(object); @@ -631,18 +616,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S if (runtimeUpdate || m_DumpedGlobals.CustomVar.IsNew(kv.first)) { allCvs.emplace_back(kv.first); allCvs.emplace_back(JsonEncode(kv.second)); - } - if (runtimeUpdate) { - publishes["icinga:config:update:customvar"].emplace_back(kv.first); + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, kv.first, m_PrefixConfigObject + "customvar", kv.second); + } } String id = HashValue(new Array(Prepend(env, Prepend(kv.first, GetObjectIdentifiersWithoutEnv(object))))); typeCvs.emplace_back(id); - typeCvs.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}}))); + + Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}}); + typeCvs.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":customvar"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":customvar", data); } } } @@ -662,11 +649,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S if (runtimeUpdate || m_DumpedGlobals.ActionUrl.IsNew(id)) { actionUrls.emplace_back(std::move(id)); - actionUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}}))); - } + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}}); + actionUrls.emplace_back(JsonEncode(data)); - if (runtimeUpdate) { - publishes["icinga:config:update:action_url"].emplace_back(actionUrls.at(actionUrls.size() - 2u)); + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, actionUrls.at(actionUrls.size() - 2u), m_PrefixConfigObject + "action_url", data); + } } } if (!notesUrl.IsEmpty()) { @@ -676,11 +664,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S if (runtimeUpdate || m_DumpedGlobals.NotesUrl.IsNew(id)) { notesUrls.emplace_back(std::move(id)); - notesUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}}))); - } + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}}); + notesUrls.emplace_back(JsonEncode(data)); - if (runtimeUpdate) { - publishes["icinga:config:update:notes_url"].emplace_back(notesUrls.at(notesUrls.size() - 2u)); + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, notesUrls.at(notesUrls.size() - 2u), m_PrefixConfigObject + "notes_url", data); + } } } if (!iconImage.IsEmpty()) { @@ -690,11 +679,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S if (runtimeUpdate || m_DumpedGlobals.IconImage.IsNew(id)) { iconImages.emplace_back(std::move(id)); - iconImages.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}}))); - } + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}}); + iconImages.emplace_back(JsonEncode(data)); - if (runtimeUpdate) { - publishes["icinga:config:update:icon_image"].emplace_back(iconImages.at(iconImages.size() - 2u)); + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, iconImages.at(iconImages.size() - 2u), m_PrefixConfigObject + "icon_image", data); + } } } @@ -718,17 +708,18 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S groupIds->Reserve(groups->GetLength()); - auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]); for (auto& group : groups) { auto groupObj ((*getGroup)(group)); String groupId = GetObjectIdentifier(groupObj); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object))))); members.emplace_back(id); - members.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}))); + Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}); + members.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data); } groupIds->Add(groupId); @@ -755,10 +746,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String id = HashValue(new Array(Prepend(env, Prepend(kv.first, Prepend(kv.second, GetObjectIdentifiersWithoutEnv(object)))))); typeRanges.emplace_back(id); - typeRanges.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}}))); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}}); + typeRanges.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":range"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":range", data); } } } @@ -784,10 +776,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(includeTp), GetObjectIdentifiersWithoutEnv(object))))); includs.emplace_back(id); - includs.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}}))); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}}); + includs.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":override:include"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:include", data); } } @@ -813,10 +806,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(excludeTp), GetObjectIdentifiersWithoutEnv(object))))); excluds.emplace_back(id); - excluds.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}}))); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}}); + excluds.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":override:exclude"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:exclude", data); } } @@ -836,10 +830,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S for (auto& parent : parentsRaw) { String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(parent), GetObjectIdentifiersWithoutEnv(object))))); parnts.emplace_back(id); - parnts.emplace_back(JsonEncode(new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}}))); + Dictionary::Ptr data = new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}}); + parnts.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":parent"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":parent", data); } parents->Add(GetObjectIdentifier(parent)); @@ -863,17 +858,18 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S groupIds->Reserve(groups->GetLength()); - auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]); for (auto& group : groups) { auto groupObj ((*getGroup)(group)); String groupId = GetObjectIdentifier(groupObj); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object))))); members.emplace_back(id); - members.emplace_back(JsonEncode(new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}))); + Dictionary::Ptr data = new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}); + members.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data); } groupIds->Add(groupId); @@ -904,10 +900,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String userId = GetObjectIdentifier(user); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object))))); usrs.emplace_back(id); - usrs.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}))); + Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}); + usrs.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":user"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":user", data); } userIds->Add(userId); @@ -926,13 +923,16 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String id = HashValue(new Array(Prepend(env, Prepend("usergroup", Prepend(GetObjectIdentifiersWithoutEnv(usergroup), GetObjectIdentifiersWithoutEnv(object)))))); groups.emplace_back(id); - groups.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}))); + Dictionary::Ptr groupData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}); + groups.emplace_back(JsonEncode(groupData)); notificationRecipients.emplace_back(id); - notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}))); + Dictionary::Ptr notificationRecipientData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}); + notificationRecipients.emplace_back(JsonEncode(notificationRecipientData)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":usergroup"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":usergroup", groupData); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", notificationRecipientData); } usergroupIds->Add(usergroupId); @@ -942,10 +942,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S String userId = GetObjectIdentifier(user); String id = HashValue(new Array(Prepend(env, Prepend("user", Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object)))))); notificationRecipients.emplace_back(id); - notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}))); + Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}); + notificationRecipients.emplace_back(JsonEncode(data)); if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":recipient"].emplace_back(id); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", data); } } @@ -990,12 +991,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S typeArgs.emplace_back(id); typeArgs.emplace_back(JsonEncode(values)); - if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":argument"].emplace_back(id); - } - argChksms.emplace_back(id); - argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); + String checksum = HashValue(kv.second); + argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); + + if (runtimeUpdate) { + values->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":argument", values); + } } } @@ -1037,12 +1040,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S typeVars.emplace_back(id); typeVars.emplace_back(JsonEncode(values)); - if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName + ":envvar"].emplace_back(id); - } - varChksms.emplace_back(id); - varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); + String checksum = HashValue(kv.second); + varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); + + if (runtimeUpdate) { + values->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":envvar", values); + } } } @@ -1055,9 +1060,14 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable) if (!m_Rcon || !m_Rcon->IsConnected()) return; + String objectType = GetLowerCaseTypeNameDB(checkable); + String objectKey = GetObjectIdentifier(checkable); + Dictionary::Ptr stateAttrs = SerializeState(checkable); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::State); + } // Used to update a single object, used for runtime updates @@ -1068,15 +1078,23 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd String typeName = GetLowerCaseTypeNameDB(object); - std::map> hMSets, publishes; - std::vector states = {"HMSET", m_PrefixStateObject + typeName}; + std::map> hMSets; + std::vector runtimeUpdates; - CreateConfigUpdate(object, typeName, hMSets, publishes, runtimeUpdate); + CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); Checkable::Ptr checkable = dynamic_pointer_cast(object); if (checkable) { String objectKey = GetObjectIdentifier(object); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State); - publishes["icinga:config:update:state:" + typeName].emplace_back(objectKey); + Dictionary::Ptr state = SerializeState(checkable); + String checksum = HashValue(state); + + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::State); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::State); + + if (runtimeUpdate) { + state->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName + ":state", state); + } } std::vector > transaction = {{"MULTI"}}; @@ -1088,17 +1106,19 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd } } - for (auto& kv : publishes) { - for (auto& message : kv.second) { - std::vector publish; + for (auto& objectAttributes : runtimeUpdates) { + std::vector xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); + ObjectLock olock(objectAttributes); - publish.reserve(3); - publish.emplace_back("PUBLISH"); - publish.emplace_back(kv.first); - publish.emplace_back(std::move(message)); - - transaction.emplace_back(std::move(publish)); + for (const Dictionary::Pair& kv : objectAttributes) { + String value = IcingaToStreamValue(kv.second); + if (!value.IsEmpty()) { + xAdd.emplace_back(kv.first); + xAdd.emplace_back(value); + } } + + transaction.emplace_back(std::move(xAdd)); } if (transaction.size() > 1) { @@ -1111,6 +1131,15 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd } } +void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector& runtimeUpdates, const String& objectKey, + const String& redisKey, const Dictionary::Ptr& data) +{ + data->Set("id", objectKey); + data->Set("redis_key", redisKey); + data->Set("runtime_type", "upsert"); + runtimeUpdates.emplace_back(data); +} + // Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant // for IcingaDB. bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums) @@ -1385,7 +1414,7 @@ bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& a */ void IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, - std::map>& publishes, bool runtimeUpdate) + std::vector& runtimeUpdates, bool runtimeUpdate) { /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. if (!runtimeUpdate && m_ConfigDumpInProgress) @@ -1401,7 +1430,7 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN if (!PrepareObject(object, attr, chksm)) return; - InsertObjectDependencies(object, typeName, hMSets, publishes, runtimeUpdate); + InsertObjectDependencies(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); String objectKey = GetObjectIdentifier(object); auto& attrs (hMSets[m_PrefixConfigObject + typeName]); @@ -1410,12 +1439,14 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN attrs.emplace_back(objectKey); attrs.emplace_back(JsonEncode(attr)); + String checksum = HashValue(attr); chksms.emplace_back(objectKey); - chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(attr)}}))); + chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); /* Send an update event to subscribers. */ if (runtimeUpdate) { - publishes["icinga:config:update:" + typeName].emplace_back(objectKey); + attr->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName, attr); } } @@ -1425,22 +1456,24 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) String objectKey = GetObjectIdentifier(object); m_Rcon->FireAndForgetQueries({ - {"HDEL", m_PrefixConfigObject + typeName, objectKey}, - {"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, - {"PUBLISH", "icinga:config:delete:" + typeName, objectKey} - }, Prio::Config); + {"HDEL", m_PrefixConfigObject + typeName, objectKey}, + { + "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*", + "redis_key", m_PrefixConfigObject + typeName, "id", objectKey, "runtime_type", "delete" + } + }, Prio::Config); auto checkable (dynamic_pointer_cast(object)); if (checkable) { - m_Rcon->FireAndForgetQuery( + m_Rcon->FireAndForgetQueries({ { "ZREM", dynamic_pointer_cast(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", GetObjectIdentifier(checkable) }, - Prio::CheckResult - ); + {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey}, + }, Prio::CheckResult); } } @@ -1470,19 +1503,22 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu tie(host, service) = GetHostService(checkable); - String streamname; + String redisKey; if (service) - streamname = "icinga:state:stream:service"; + redisKey = "icinga:service:state"; else - streamname = "icinga:state:stream:host"; + redisKey = "icinga:host:state"; Dictionary::Ptr objectAttrs = SerializeState(checkable); + objectAttrs->Set("redis_key", redisKey); + objectAttrs->Set("runtime_type", "upsert"); + objectAttrs->Set("checksum", HashValue(objectAttrs)); - std::vector streamadd({"XADD", streamname, "*"}); + std::vector streamadd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); ObjectLock olock(objectAttrs); for (const Dictionary::Pair& kv : objectAttrs) { streamadd.emplace_back(kv.first); - streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); + streamadd.emplace_back(IcingaToStreamValue(kv.second)); } m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); @@ -2074,7 +2110,17 @@ Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) tie(host, service) = GetHostService(checkable); - attrs->Set("id", GetObjectIdentifier(checkable));; + String id = GetObjectIdentifier(checkable); + + /* + * As there is a 1:1 relationship between host and host state, the host ID ('object_id') + * is also used as the host state ID ('id'). These are duplicated to 1) avoid having + * special handling for this in Icinga DB and 2) to have both a primary key and a foreign key + * in the SQL database in the end. In the database 'object_id' ends up as foreign key 'host_state.host_id' + * referring to 'host.id' while 'id' ends up as the primary key 'host_state.id'. This also applies for service. + */ + attrs->Set("id", id); + attrs->Set("object_id", id); attrs->Set("environment_id", m_EnvironmentId); attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard); diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp index c9650cefc..8ca9e298e 100644 --- a/lib/icingadb/icingadb-utility.cpp +++ b/lib/icingadb/icingadb-utility.cpp @@ -212,3 +212,18 @@ String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj) long long IcingaDB::TimestampToMilliseconds(double timestamp) { return static_cast(timestamp * 1000); } + +String IcingaDB::IcingaToStreamValue(const Value& value) +{ + switch (value.GetType()) { + case ValueBoolean: + return Convert::ToString(int(value)); + case ValueString: + return Utility::ValidateUTF8(value); + case ValueNumber: + case ValueEmpty: + return Convert::ToString(value); + default: + return JsonEncode(value); + } +} diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index e751ae264..ad97ad153 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -48,9 +48,8 @@ IcingaDB::IcingaDB() m_WorkQueue.SetName("IcingaDB"); - m_PrefixConfigObject = "icinga:config:"; + m_PrefixConfigObject = "icinga:"; m_PrefixConfigCheckSum = "icinga:checksum:"; - m_PrefixStateObject = "icinga:config:state:"; } /** diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index ec3ebe4ea..2cdc30a16 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -61,13 +61,15 @@ private: std::vector GetTypeOverwriteKeys(const String& type); std::vector GetTypeDumpSignalKeys(const Type::Ptr& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, - std::map>& publishes, bool runtimeUpdate); + std::vector& runtimeUpdates, bool runtimeUpdate); void UpdateState(const Checkable::Ptr& checkable); void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map>& hMSets, - std::map>& publishes, bool runtimeUpdate); + std::vector& runtimeUpdates, bool runtimeUpdate); void SendConfigDelete(const ConfigObject::Ptr& object); void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); + void AddObjectDataToRuntimeUpdates(std::vector& runtimeUpdates, const String& objectKey, + const String& redisKey, const Dictionary::Ptr& data); void SendSentNotification( const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set& users, @@ -93,6 +95,7 @@ private: static String FormatCheckSumBinary(const String& str); static String FormatCommandLine(const Value& commandLine); static long long TimestampToMilliseconds(double timestamp); + static String IcingaToStreamValue(const Value& value); static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object); static String GetObjectIdentifier(const ConfigObject::Ptr& object); @@ -152,7 +155,6 @@ private: String m_PrefixConfigObject; String m_PrefixConfigCheckSum; - String m_PrefixStateObject; bool m_ConfigDumpInProgress; bool m_ConfigDumpDone;