Merge pull request #8733 from Icinga/feature/icingadb-runtime-updates-via-streams

Icinga DB runtime updates and state via streams
This commit is contained in:
Noah Hilverling 2021-05-07 14:38:23 +02:00 committed by GitHub
commit a8f98cf721
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 125 deletions

View File

@ -217,9 +217,12 @@ void IcingaDB::UpdateAllConfigObjects()
std::mutex ourContentMutex; std::mutex ourContentMutex;
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> hMSets, publishes; std::map<String, std::vector<String>> hMSets;
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType}; // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded)
std::vector<std::vector<String> > transaction = {{"MULTI"}}; std::vector<String> states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
// Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded)
std::vector<String> statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
auto skimObjects ([&]() { auto skimObjects ([&]() {
@ -242,12 +245,19 @@ void IcingaDB::UpdateAllConfigObjects()
if (lcType != GetLowerCaseTypeNameDB(object)) if (lcType != GetLowerCaseTypeNameDB(object))
continue; continue;
CreateConfigUpdate(object, lcType, hMSets, publishes, false); std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false);
// Write out inital state for checkables // Write out inital state for checkables
if (dumpState) { if (dumpState) {
states.emplace_back(GetObjectIdentifier(object)); String objectKey = GetObjectIdentifier(object);
states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast<Checkable>(object)))); Dictionary::Ptr state = SerializeState(dynamic_pointer_cast<Checkable>(object));
states.emplace_back(objectKey);
states.emplace_back(JsonEncode(state));
statesChksms.emplace_back(objectKey);
statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}})));
} }
bulkCounter++; bulkCounter++;
@ -263,24 +273,12 @@ void IcingaDB::UpdateAllConfigObjects()
if (states.size() > 2) { if (states.size() > 2) {
transaction.emplace_back(std::move(states)); transaction.emplace_back(std::move(states));
states = {"HMSET", m_PrefixStateObject + lcType}; transaction.emplace_back(std::move(statesChksms));
} states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
for (auto& kv : publishes) {
for (auto& message : kv.second) {
std::vector<String> publish;
publish.reserve(3);
publish.emplace_back("PUBLISH");
publish.emplace_back(kv.first);
publish.emplace_back(std::move(message));
transaction.emplace_back(std::move(publish));
}
} }
hMSets = decltype(hMSets)(); hMSets = decltype(hMSets)();
publishes = decltype(publishes)();
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
@ -319,19 +317,6 @@ void IcingaDB::UpdateAllConfigObjects()
if (states.size() > 2) if (states.size() > 2)
transaction.emplace_back(std::move(states)); transaction.emplace_back(std::move(states));
for (auto& kv : publishes) {
for (auto& message : kv.second) {
std::vector<String> publish;
publish.reserve(3);
publish.emplace_back("PUBLISH");
publish.emplace_back(kv.first);
publish.emplace_back(std::move(message));
transaction.emplace_back(std::move(publish));
}
}
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
@ -549,8 +534,8 @@ std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
}; };
if (type == "host" || type == "service" || type == "user") { if (type == "host" || type == "service" || type == "user") {
keys.emplace_back(m_PrefixConfigObject + type + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + type + "group:member");
keys.emplace_back(m_PrefixStateObject + type); keys.emplace_back(m_PrefixConfigObject + type + ":state");
} else if (type == "timeperiod") { } else if (type == "timeperiod") {
keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); keys.emplace_back(m_PrefixConfigObject + type + ":override:include");
keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude");
@ -581,10 +566,10 @@ std::vector<String> IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type)
} }
if (type == Host::TypeInstance || type == Service::TypeInstance) { if (type == Host::TypeInstance || type == Service::TypeInstance) {
keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + lcType + "group:member");
keys.emplace_back(m_PrefixStateObject + lcType); keys.emplace_back(m_PrefixConfigObject + lcType + ":state");
} else if (type == User::TypeInstance) { } else if (type == User::TypeInstance) {
keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + lcType + "group:member");
} else if (type == TimePeriod::TypeInstance) { } else if (type == TimePeriod::TypeInstance) {
keys.emplace_back(m_PrefixConfigObject + lcType + ":override:include"); keys.emplace_back(m_PrefixConfigObject + lcType + ":override:include");
keys.emplace_back(m_PrefixConfigObject + lcType + ":override:exclude"); keys.emplace_back(m_PrefixConfigObject + lcType + ":override:exclude");
@ -610,7 +595,7 @@ static ConfigObject::Ptr GetObjectByName(const String& name)
} }
void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate) std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
{ {
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object); CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object);
@ -631,18 +616,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
if (runtimeUpdate || m_DumpedGlobals.CustomVar.IsNew(kv.first)) { if (runtimeUpdate || m_DumpedGlobals.CustomVar.IsNew(kv.first)) {
allCvs.emplace_back(kv.first); allCvs.emplace_back(kv.first);
allCvs.emplace_back(JsonEncode(kv.second)); allCvs.emplace_back(JsonEncode(kv.second));
}
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:customvar"].emplace_back(kv.first); AddObjectDataToRuntimeUpdates(runtimeUpdates, kv.first, m_PrefixConfigObject + "customvar", kv.second);
}
} }
String id = HashValue(new Array(Prepend(env, Prepend(kv.first, GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(kv.first, GetObjectIdentifiersWithoutEnv(object)))));
typeCvs.emplace_back(id); typeCvs.emplace_back(id);
typeCvs.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}})));
Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}});
typeCvs.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":customvar"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":customvar", data);
} }
} }
} }
@ -662,11 +649,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
if (runtimeUpdate || m_DumpedGlobals.ActionUrl.IsNew(id)) { if (runtimeUpdate || m_DumpedGlobals.ActionUrl.IsNew(id)) {
actionUrls.emplace_back(std::move(id)); actionUrls.emplace_back(std::move(id));
actionUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}});
} actionUrls.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:action_url"].emplace_back(actionUrls.at(actionUrls.size() - 2u)); AddObjectDataToRuntimeUpdates(runtimeUpdates, actionUrls.at(actionUrls.size() - 2u), m_PrefixConfigObject + "action_url", data);
}
} }
} }
if (!notesUrl.IsEmpty()) { if (!notesUrl.IsEmpty()) {
@ -676,11 +664,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
if (runtimeUpdate || m_DumpedGlobals.NotesUrl.IsNew(id)) { if (runtimeUpdate || m_DumpedGlobals.NotesUrl.IsNew(id)) {
notesUrls.emplace_back(std::move(id)); notesUrls.emplace_back(std::move(id));
notesUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}});
} notesUrls.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:notes_url"].emplace_back(notesUrls.at(notesUrls.size() - 2u)); AddObjectDataToRuntimeUpdates(runtimeUpdates, notesUrls.at(notesUrls.size() - 2u), m_PrefixConfigObject + "notes_url", data);
}
} }
} }
if (!iconImage.IsEmpty()) { if (!iconImage.IsEmpty()) {
@ -690,11 +679,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
if (runtimeUpdate || m_DumpedGlobals.IconImage.IsNew(id)) { if (runtimeUpdate || m_DumpedGlobals.IconImage.IsNew(id)) {
iconImages.emplace_back(std::move(id)); iconImages.emplace_back(std::move(id));
iconImages.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}});
} iconImages.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:icon_image"].emplace_back(iconImages.at(iconImages.size() - 2u)); AddObjectDataToRuntimeUpdates(runtimeUpdates, iconImages.at(iconImages.size() - 2u), m_PrefixConfigObject + "icon_image", data);
}
} }
} }
@ -718,17 +708,18 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
groupIds->Reserve(groups->GetLength()); groupIds->Reserve(groups->GetLength());
auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]);
for (auto& group : groups) { for (auto& group : groups) {
auto groupObj ((*getGroup)(group)); auto groupObj ((*getGroup)(group));
String groupId = GetObjectIdentifier(groupObj); String groupId = GetObjectIdentifier(groupObj);
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object)))));
members.emplace_back(id); members.emplace_back(id);
members.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}))); Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}});
members.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data);
} }
groupIds->Add(groupId); groupIds->Add(groupId);
@ -755,10 +746,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String id = HashValue(new Array(Prepend(env, Prepend(kv.first, Prepend(kv.second, GetObjectIdentifiersWithoutEnv(object)))))); String id = HashValue(new Array(Prepend(env, Prepend(kv.first, Prepend(kv.second, GetObjectIdentifiersWithoutEnv(object))))));
typeRanges.emplace_back(id); typeRanges.emplace_back(id);
typeRanges.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}});
typeRanges.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":range"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":range", data);
} }
} }
} }
@ -784,10 +776,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(includeTp), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(includeTp), GetObjectIdentifiersWithoutEnv(object)))));
includs.emplace_back(id); includs.emplace_back(id);
includs.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}});
includs.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":override:include"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:include", data);
} }
} }
@ -813,10 +806,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(excludeTp), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(excludeTp), GetObjectIdentifiersWithoutEnv(object)))));
excluds.emplace_back(id); excluds.emplace_back(id);
excluds.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}}))); Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}});
excluds.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":override:exclude"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:exclude", data);
} }
} }
@ -836,10 +830,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
for (auto& parent : parentsRaw) { for (auto& parent : parentsRaw) {
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(parent), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(parent), GetObjectIdentifiersWithoutEnv(object)))));
parnts.emplace_back(id); parnts.emplace_back(id);
parnts.emplace_back(JsonEncode(new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}}))); Dictionary::Ptr data = new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}});
parnts.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":parent"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":parent", data);
} }
parents->Add(GetObjectIdentifier(parent)); parents->Add(GetObjectIdentifier(parent));
@ -863,17 +858,18 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
groupIds->Reserve(groups->GetLength()); groupIds->Reserve(groups->GetLength());
auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]);
for (auto& group : groups) { for (auto& group : groups) {
auto groupObj ((*getGroup)(group)); auto groupObj ((*getGroup)(group));
String groupId = GetObjectIdentifier(groupObj); String groupId = GetObjectIdentifier(groupObj);
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object)))));
members.emplace_back(id); members.emplace_back(id);
members.emplace_back(JsonEncode(new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}}))); Dictionary::Ptr data = new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}});
members.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data);
} }
groupIds->Add(groupId); groupIds->Add(groupId);
@ -904,10 +900,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String userId = GetObjectIdentifier(user); String userId = GetObjectIdentifier(user);
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object))))); String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object)))));
usrs.emplace_back(id); usrs.emplace_back(id);
usrs.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}))); Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}});
usrs.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":user"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":user", data);
} }
userIds->Add(userId); userIds->Add(userId);
@ -926,13 +923,16 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String id = HashValue(new Array(Prepend(env, Prepend("usergroup", Prepend(GetObjectIdentifiersWithoutEnv(usergroup), GetObjectIdentifiersWithoutEnv(object)))))); String id = HashValue(new Array(Prepend(env, Prepend("usergroup", Prepend(GetObjectIdentifiersWithoutEnv(usergroup), GetObjectIdentifiersWithoutEnv(object))))));
groups.emplace_back(id); groups.emplace_back(id);
groups.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}))); Dictionary::Ptr groupData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}});
groups.emplace_back(JsonEncode(groupData));
notificationRecipients.emplace_back(id); notificationRecipients.emplace_back(id);
notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}))); Dictionary::Ptr notificationRecipientData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}});
notificationRecipients.emplace_back(JsonEncode(notificationRecipientData));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":usergroup"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":usergroup", groupData);
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", notificationRecipientData);
} }
usergroupIds->Add(usergroupId); usergroupIds->Add(usergroupId);
@ -942,10 +942,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
String userId = GetObjectIdentifier(user); String userId = GetObjectIdentifier(user);
String id = HashValue(new Array(Prepend(env, Prepend("user", Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object)))))); String id = HashValue(new Array(Prepend(env, Prepend("user", Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object))))));
notificationRecipients.emplace_back(id); notificationRecipients.emplace_back(id);
notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}))); Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}});
notificationRecipients.emplace_back(JsonEncode(data));
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":recipient"].emplace_back(id); AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", data);
} }
} }
@ -990,12 +991,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
typeArgs.emplace_back(id); typeArgs.emplace_back(id);
typeArgs.emplace_back(JsonEncode(values)); typeArgs.emplace_back(JsonEncode(values));
if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":argument"].emplace_back(id);
}
argChksms.emplace_back(id); argChksms.emplace_back(id);
argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); String checksum = HashValue(kv.second);
argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
if (runtimeUpdate) {
values->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":argument", values);
}
} }
} }
@ -1037,12 +1040,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
typeVars.emplace_back(id); typeVars.emplace_back(id);
typeVars.emplace_back(JsonEncode(values)); typeVars.emplace_back(JsonEncode(values));
if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName + ":envvar"].emplace_back(id);
}
varChksms.emplace_back(id); varChksms.emplace_back(id);
varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); String checksum = HashValue(kv.second);
varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
if (runtimeUpdate) {
values->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":envvar", values);
}
} }
} }
@ -1055,9 +1060,14 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable)
if (!m_Rcon || !m_Rcon->IsConnected()) if (!m_Rcon || !m_Rcon->IsConnected())
return; return;
String objectType = GetLowerCaseTypeNameDB(checkable);
String objectKey = GetObjectIdentifier(checkable);
Dictionary::Ptr stateAttrs = SerializeState(checkable); Dictionary::Ptr stateAttrs = SerializeState(checkable);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State); m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::State);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::State);
} }
// Used to update a single object, used for runtime updates // Used to update a single object, used for runtime updates
@ -1068,15 +1078,23 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
String typeName = GetLowerCaseTypeNameDB(object); String typeName = GetLowerCaseTypeNameDB(object);
std::map<String, std::vector<String>> hMSets, publishes; std::map<String, std::vector<String>> hMSets;
std::vector<String> states = {"HMSET", m_PrefixStateObject + typeName}; std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, typeName, hMSets, publishes, runtimeUpdate); CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) { if (checkable) {
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State); Dictionary::Ptr state = SerializeState(checkable);
publishes["icinga:config:update:state:" + typeName].emplace_back(objectKey); String checksum = HashValue(state);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::State);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::State);
if (runtimeUpdate) {
state->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName + ":state", state);
}
} }
std::vector<std::vector<String> > transaction = {{"MULTI"}}; std::vector<std::vector<String> > transaction = {{"MULTI"}};
@ -1088,17 +1106,19 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
} }
} }
for (auto& kv : publishes) { for (auto& objectAttributes : runtimeUpdates) {
for (auto& message : kv.second) { std::vector<String> xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
std::vector<String> publish; ObjectLock olock(objectAttributes);
publish.reserve(3); for (const Dictionary::Pair& kv : objectAttributes) {
publish.emplace_back("PUBLISH"); String value = IcingaToStreamValue(kv.second);
publish.emplace_back(kv.first); if (!value.IsEmpty()) {
publish.emplace_back(std::move(message)); xAdd.emplace_back(kv.first);
xAdd.emplace_back(value);
transaction.emplace_back(std::move(publish)); }
} }
transaction.emplace_back(std::move(xAdd));
} }
if (transaction.size() > 1) { if (transaction.size() > 1) {
@ -1111,6 +1131,15 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
} }
} }
void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey,
const String& redisKey, const Dictionary::Ptr& data)
{
data->Set("id", objectKey);
data->Set("redis_key", redisKey);
data->Set("runtime_type", "upsert");
runtimeUpdates.emplace_back(data);
}
// Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant // Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant
// for IcingaDB. // for IcingaDB.
bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums) bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums)
@ -1385,7 +1414,7 @@ bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& a
*/ */
void void
IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate) std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
{ {
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
if (!runtimeUpdate && m_ConfigDumpInProgress) if (!runtimeUpdate && m_ConfigDumpInProgress)
@ -1401,7 +1430,7 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN
if (!PrepareObject(object, attr, chksm)) if (!PrepareObject(object, attr, chksm))
return; return;
InsertObjectDependencies(object, typeName, hMSets, publishes, runtimeUpdate); InsertObjectDependencies(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
auto& attrs (hMSets[m_PrefixConfigObject + typeName]); auto& attrs (hMSets[m_PrefixConfigObject + typeName]);
@ -1410,12 +1439,14 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN
attrs.emplace_back(objectKey); attrs.emplace_back(objectKey);
attrs.emplace_back(JsonEncode(attr)); attrs.emplace_back(JsonEncode(attr));
String checksum = HashValue(attr);
chksms.emplace_back(objectKey); chksms.emplace_back(objectKey);
chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(attr)}}))); chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
/* Send an update event to subscribers. */ /* Send an update event to subscribers. */
if (runtimeUpdate) { if (runtimeUpdate) {
publishes["icinga:config:update:" + typeName].emplace_back(objectKey); attr->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName, attr);
} }
} }
@ -1425,22 +1456,24 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
m_Rcon->FireAndForgetQueries({ m_Rcon->FireAndForgetQueries({
{"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"HDEL", m_PrefixConfigObject + typeName, objectKey},
{"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, {
{"PUBLISH", "icinga:config:delete:" + typeName, objectKey} "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*",
}, Prio::Config); "redis_key", m_PrefixConfigObject + typeName, "id", objectKey, "runtime_type", "delete"
}
}, Prio::Config);
auto checkable (dynamic_pointer_cast<Checkable>(object)); auto checkable (dynamic_pointer_cast<Checkable>(object));
if (checkable) { if (checkable) {
m_Rcon->FireAndForgetQuery( m_Rcon->FireAndForgetQueries({
{ {
"ZREM", "ZREM",
dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host",
GetObjectIdentifier(checkable) GetObjectIdentifier(checkable)
}, },
Prio::CheckResult {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey},
); }, Prio::CheckResult);
} }
} }
@ -1470,19 +1503,22 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu
tie(host, service) = GetHostService(checkable); tie(host, service) = GetHostService(checkable);
String streamname; String redisKey;
if (service) if (service)
streamname = "icinga:state:stream:service"; redisKey = "icinga:service:state";
else else
streamname = "icinga:state:stream:host"; redisKey = "icinga:host:state";
Dictionary::Ptr objectAttrs = SerializeState(checkable); Dictionary::Ptr objectAttrs = SerializeState(checkable);
objectAttrs->Set("redis_key", redisKey);
objectAttrs->Set("runtime_type", "upsert");
objectAttrs->Set("checksum", HashValue(objectAttrs));
std::vector<String> streamadd({"XADD", streamname, "*"}); std::vector<String> streamadd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
ObjectLock olock(objectAttrs); ObjectLock olock(objectAttrs);
for (const Dictionary::Pair& kv : objectAttrs) { for (const Dictionary::Pair& kv : objectAttrs) {
streamadd.emplace_back(kv.first); streamadd.emplace_back(kv.first);
streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); streamadd.emplace_back(IcingaToStreamValue(kv.second));
} }
m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State);
@ -2074,7 +2110,17 @@ Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable)
tie(host, service) = GetHostService(checkable); tie(host, service) = GetHostService(checkable);
attrs->Set("id", GetObjectIdentifier(checkable));; String id = GetObjectIdentifier(checkable);
/*
* As there is a 1:1 relationship between host and host state, the host ID ('object_id')
* is also used as the host state ID ('id'). These are duplicated to 1) avoid having
* special handling for this in Icinga DB and 2) to have both a primary key and a foreign key
* in the SQL database in the end. In the database 'object_id' ends up as foreign key 'host_state.host_id'
* referring to 'host.id' while 'id' ends up as the primary key 'host_state.id'. This also applies for service.
*/
attrs->Set("id", id);
attrs->Set("object_id", id);
attrs->Set("environment_id", m_EnvironmentId); attrs->Set("environment_id", m_EnvironmentId);
attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard); attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard);

View File

@ -212,3 +212,18 @@ String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj)
long long IcingaDB::TimestampToMilliseconds(double timestamp) { long long IcingaDB::TimestampToMilliseconds(double timestamp) {
return static_cast<long long>(timestamp * 1000); return static_cast<long long>(timestamp * 1000);
} }
String IcingaDB::IcingaToStreamValue(const Value& value)
{
switch (value.GetType()) {
case ValueBoolean:
return Convert::ToString(int(value));
case ValueString:
return Utility::ValidateUTF8(value);
case ValueNumber:
case ValueEmpty:
return Convert::ToString(value);
default:
return JsonEncode(value);
}
}

View File

@ -48,9 +48,8 @@ IcingaDB::IcingaDB()
m_WorkQueue.SetName("IcingaDB"); m_WorkQueue.SetName("IcingaDB");
m_PrefixConfigObject = "icinga:config:"; m_PrefixConfigObject = "icinga:";
m_PrefixConfigCheckSum = "icinga:checksum:"; m_PrefixConfigCheckSum = "icinga:checksum:";
m_PrefixStateObject = "icinga:config:state:";
} }
/** /**

View File

@ -61,13 +61,15 @@ private:
std::vector<String> GetTypeOverwriteKeys(const String& type); std::vector<String> GetTypeOverwriteKeys(const String& type);
std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type); std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate); std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void UpdateState(const Checkable::Ptr& checkable); void UpdateState(const Checkable::Ptr& checkable);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets, void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate); std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object); void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
void AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey,
const String& redisKey, const Dictionary::Ptr& data);
void SendSentNotification( void SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
@ -93,6 +95,7 @@ private:
static String FormatCheckSumBinary(const String& str); static String FormatCheckSumBinary(const String& str);
static String FormatCommandLine(const Value& commandLine); static String FormatCommandLine(const Value& commandLine);
static long long TimestampToMilliseconds(double timestamp); static long long TimestampToMilliseconds(double timestamp);
static String IcingaToStreamValue(const Value& value);
static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object); static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object);
static String GetObjectIdentifier(const ConfigObject::Ptr& object); static String GetObjectIdentifier(const ConfigObject::Ptr& object);
@ -152,7 +155,6 @@ private:
String m_PrefixConfigObject; String m_PrefixConfigObject;
String m_PrefixConfigCheckSum; String m_PrefixConfigCheckSum;
String m_PrefixStateObject;
bool m_ConfigDumpInProgress; bool m_ConfigDumpInProgress;
bool m_ConfigDumpDone; bool m_ConfigDumpDone;