mirror of https://github.com/Icinga/icinga2.git
Icinga DB: Sync runtime updates using streams
This commit is contained in:
parent
aca8d063dc
commit
ab04a4ee98
|
@ -217,8 +217,9 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
std::mutex ourContentMutex;
|
||||
|
||||
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
|
||||
std::map<String, std::vector<String>> hMSets, publishes;
|
||||
std::map<String, std::vector<String>> hMSets;
|
||||
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
std::vector<std::vector<String> > transaction = {{"MULTI"}};
|
||||
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
|
||||
|
||||
|
@ -242,7 +243,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
if (lcType != GetLowerCaseTypeNameDB(object))
|
||||
continue;
|
||||
|
||||
CreateConfigUpdate(object, lcType, hMSets, publishes, false);
|
||||
CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false);
|
||||
|
||||
// Write out inital state for checkables
|
||||
if (dumpState) {
|
||||
|
@ -266,21 +267,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
states = {"HMSET", m_PrefixStateObject + lcType};
|
||||
}
|
||||
|
||||
for (auto& kv : publishes) {
|
||||
for (auto& message : kv.second) {
|
||||
std::vector<String> publish;
|
||||
|
||||
publish.reserve(3);
|
||||
publish.emplace_back("PUBLISH");
|
||||
publish.emplace_back(kv.first);
|
||||
publish.emplace_back(std::move(message));
|
||||
|
||||
transaction.emplace_back(std::move(publish));
|
||||
}
|
||||
}
|
||||
|
||||
hMSets = decltype(hMSets)();
|
||||
publishes = decltype(publishes)();
|
||||
|
||||
if (transaction.size() > 1) {
|
||||
transaction.push_back({"EXEC"});
|
||||
|
@ -319,19 +306,6 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
if (states.size() > 2)
|
||||
transaction.emplace_back(std::move(states));
|
||||
|
||||
for (auto& kv : publishes) {
|
||||
for (auto& message : kv.second) {
|
||||
std::vector<String> publish;
|
||||
|
||||
publish.reserve(3);
|
||||
publish.emplace_back("PUBLISH");
|
||||
publish.emplace_back(kv.first);
|
||||
publish.emplace_back(std::move(message));
|
||||
|
||||
transaction.emplace_back(std::move(publish));
|
||||
}
|
||||
}
|
||||
|
||||
if (transaction.size() > 1) {
|
||||
transaction.push_back({"EXEC"});
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
|
@ -610,7 +584,7 @@ static ConfigObject::Ptr GetObjectByName(const String& name)
|
|||
}
|
||||
|
||||
void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
|
||||
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate)
|
||||
std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
|
||||
{
|
||||
String objectKey = GetObjectIdentifier(object);
|
||||
CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object);
|
||||
|
@ -631,18 +605,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
if (runtimeUpdate || m_DumpedGlobals.CustomVar.IsNew(kv.first)) {
|
||||
allCvs.emplace_back(kv.first);
|
||||
allCvs.emplace_back(JsonEncode(kv.second));
|
||||
}
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:customvar"].emplace_back(kv.first);
|
||||
if (runtimeUpdate) {
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, kv.first, m_PrefixConfigObject + "customvar", kv.second);
|
||||
}
|
||||
}
|
||||
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(kv.first, GetObjectIdentifiersWithoutEnv(object)))));
|
||||
typeCvs.emplace_back(id);
|
||||
typeCvs.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}})));
|
||||
|
||||
Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}});
|
||||
typeCvs.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":customvar"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":customvar", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -662,11 +638,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
if (runtimeUpdate || m_DumpedGlobals.ActionUrl.IsNew(id)) {
|
||||
actionUrls.emplace_back(std::move(id));
|
||||
actionUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}})));
|
||||
}
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}});
|
||||
actionUrls.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:action_url"].emplace_back(actionUrls.at(actionUrls.size() - 2u));
|
||||
if (runtimeUpdate) {
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, actionUrls.at(actionUrls.size() - 2u), m_PrefixConfigObject + "action_url", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!notesUrl.IsEmpty()) {
|
||||
|
@ -676,11 +653,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
if (runtimeUpdate || m_DumpedGlobals.NotesUrl.IsNew(id)) {
|
||||
notesUrls.emplace_back(std::move(id));
|
||||
notesUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}})));
|
||||
}
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}});
|
||||
notesUrls.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:notes_url"].emplace_back(notesUrls.at(notesUrls.size() - 2u));
|
||||
if (runtimeUpdate) {
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, notesUrls.at(notesUrls.size() - 2u), m_PrefixConfigObject + "notes_url", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!iconImage.IsEmpty()) {
|
||||
|
@ -690,11 +668,12 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
if (runtimeUpdate || m_DumpedGlobals.IconImage.IsNew(id)) {
|
||||
iconImages.emplace_back(std::move(id));
|
||||
iconImages.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}})));
|
||||
}
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}});
|
||||
iconImages.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:icon_image"].emplace_back(iconImages.at(iconImages.size() - 2u));
|
||||
if (runtimeUpdate) {
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, iconImages.at(iconImages.size() - 2u), m_PrefixConfigObject + "icon_image", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -725,10 +704,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
String groupId = GetObjectIdentifier(groupObj);
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
members.emplace_back(id);
|
||||
members.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"object_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}});
|
||||
members.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":groupmember", data);
|
||||
}
|
||||
|
||||
groupIds->Add(groupId);
|
||||
|
@ -755,10 +735,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(kv.first, Prepend(kv.second, GetObjectIdentifiersWithoutEnv(object))))));
|
||||
typeRanges.emplace_back(id);
|
||||
typeRanges.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}});
|
||||
typeRanges.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":range"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":range", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -784,10 +765,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(includeTp), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
includs.emplace_back(id);
|
||||
includs.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}});
|
||||
includs.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":override:include"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:include", data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -813,10 +795,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(excludeTp), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
excluds.emplace_back(id);
|
||||
excluds.emplace_back(JsonEncode(new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}});
|
||||
excluds.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":override:exclude"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:exclude", data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -836,10 +819,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
for (auto& parent : parentsRaw) {
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(parent), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
parnts.emplace_back(id);
|
||||
parnts.emplace_back(JsonEncode(new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"zone_id", objectKey}, {"environment_id", m_EnvironmentId}, {"parent_id", GetObjectIdentifier(parent)}});
|
||||
parnts.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":parent"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":parent", data);
|
||||
}
|
||||
|
||||
parents->Add(GetObjectIdentifier(parent));
|
||||
|
@ -870,10 +854,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
String groupId = GetObjectIdentifier(groupObj);
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(groupObj), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
members.emplace_back(id);
|
||||
members.emplace_back(JsonEncode(new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"group_id", groupId}});
|
||||
members.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":groupmember"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":groupmember", data);
|
||||
}
|
||||
|
||||
groupIds->Add(groupId);
|
||||
|
@ -904,10 +889,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
String userId = GetObjectIdentifier(user);
|
||||
String id = HashValue(new Array(Prepend(env, Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object)))));
|
||||
usrs.emplace_back(id);
|
||||
usrs.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}});
|
||||
usrs.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":user"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":user", data);
|
||||
}
|
||||
|
||||
userIds->Add(userId);
|
||||
|
@ -926,13 +912,16 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
|
||||
String id = HashValue(new Array(Prepend(env, Prepend("usergroup", Prepend(GetObjectIdentifiersWithoutEnv(usergroup), GetObjectIdentifiersWithoutEnv(object))))));
|
||||
groups.emplace_back(id);
|
||||
groups.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}})));
|
||||
Dictionary::Ptr groupData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}});
|
||||
groups.emplace_back(JsonEncode(groupData));
|
||||
|
||||
notificationRecipients.emplace_back(id);
|
||||
notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}})));
|
||||
Dictionary::Ptr notificationRecipientData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}});
|
||||
notificationRecipients.emplace_back(JsonEncode(notificationRecipientData));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":usergroup"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":usergroup", groupData);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", notificationRecipientData);
|
||||
}
|
||||
|
||||
usergroupIds->Add(usergroupId);
|
||||
|
@ -942,10 +931,11 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
String userId = GetObjectIdentifier(user);
|
||||
String id = HashValue(new Array(Prepend(env, Prepend("user", Prepend(GetObjectIdentifiersWithoutEnv(user), GetObjectIdentifiersWithoutEnv(object))))));
|
||||
notificationRecipients.emplace_back(id);
|
||||
notificationRecipients.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}})));
|
||||
Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}});
|
||||
notificationRecipients.emplace_back(JsonEncode(data));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":recipient"].emplace_back(id);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -990,12 +980,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
typeArgs.emplace_back(id);
|
||||
typeArgs.emplace_back(JsonEncode(values));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":argument"].emplace_back(id);
|
||||
}
|
||||
|
||||
argChksms.emplace_back(id);
|
||||
argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}})));
|
||||
String checksum = HashValue(kv.second);
|
||||
argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
values->Set("checksum", checksum);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":argument", values);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1037,12 +1029,14 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
|
|||
typeVars.emplace_back(id);
|
||||
typeVars.emplace_back(JsonEncode(values));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName + ":envvar"].emplace_back(id);
|
||||
}
|
||||
|
||||
varChksms.emplace_back(id);
|
||||
varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}})));
|
||||
String checksum = HashValue(kv.second);
|
||||
varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
|
||||
|
||||
if (runtimeUpdate) {
|
||||
values->Set("checksum", checksum);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":envvar", values);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1068,10 +1062,11 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
|
|||
|
||||
String typeName = GetLowerCaseTypeNameDB(object);
|
||||
|
||||
std::map<String, std::vector<String>> hMSets, publishes;
|
||||
std::map<String, std::vector<String>> hMSets;
|
||||
std::vector<String> states = {"HMSET", m_PrefixStateObject + typeName};
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
|
||||
CreateConfigUpdate(object, typeName, hMSets, publishes, runtimeUpdate);
|
||||
CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
|
||||
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
|
||||
if (checkable) {
|
||||
String objectKey = GetObjectIdentifier(object);
|
||||
|
@ -1088,17 +1083,19 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
|
|||
}
|
||||
}
|
||||
|
||||
for (auto& kv : publishes) {
|
||||
for (auto& message : kv.second) {
|
||||
std::vector<String> publish;
|
||||
for (auto& objectAttributes : runtimeUpdates) {
|
||||
std::vector<String> xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
|
||||
ObjectLock olock(objectAttributes);
|
||||
|
||||
publish.reserve(3);
|
||||
publish.emplace_back("PUBLISH");
|
||||
publish.emplace_back(kv.first);
|
||||
publish.emplace_back(std::move(message));
|
||||
|
||||
transaction.emplace_back(std::move(publish));
|
||||
for (const Dictionary::Pair& kv : objectAttributes) {
|
||||
String value = IcingaToStreamValue(kv.second);
|
||||
if (!value.IsEmpty()) {
|
||||
xAdd.emplace_back(kv.first);
|
||||
xAdd.emplace_back(value);
|
||||
}
|
||||
}
|
||||
|
||||
transaction.emplace_back(std::move(xAdd));
|
||||
}
|
||||
|
||||
if (transaction.size() > 1) {
|
||||
|
@ -1111,6 +1108,15 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
|
|||
}
|
||||
}
|
||||
|
||||
void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey,
|
||||
const String& redisKey, const Dictionary::Ptr& data)
|
||||
{
|
||||
data->Set("id", objectKey);
|
||||
data->Set("redis_key", redisKey);
|
||||
data->Set("runtime_type", "upsert");
|
||||
runtimeUpdates.emplace_back(data);
|
||||
}
|
||||
|
||||
// Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant
|
||||
// for IcingaDB.
|
||||
bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums)
|
||||
|
@ -1385,7 +1391,7 @@ bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& a
|
|||
*/
|
||||
void
|
||||
IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
|
||||
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate)
|
||||
std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
|
||||
{
|
||||
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
|
||||
if (!runtimeUpdate && m_ConfigDumpInProgress)
|
||||
|
@ -1401,7 +1407,7 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN
|
|||
if (!PrepareObject(object, attr, chksm))
|
||||
return;
|
||||
|
||||
InsertObjectDependencies(object, typeName, hMSets, publishes, runtimeUpdate);
|
||||
InsertObjectDependencies(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
|
||||
|
||||
String objectKey = GetObjectIdentifier(object);
|
||||
auto& attrs (hMSets[m_PrefixConfigObject + typeName]);
|
||||
|
@ -1410,12 +1416,14 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN
|
|||
attrs.emplace_back(objectKey);
|
||||
attrs.emplace_back(JsonEncode(attr));
|
||||
|
||||
String checksum = HashValue(attr);
|
||||
chksms.emplace_back(objectKey);
|
||||
chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(attr)}})));
|
||||
chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}})));
|
||||
|
||||
/* Send an update event to subscribers. */
|
||||
if (runtimeUpdate) {
|
||||
publishes["icinga:config:update:" + typeName].emplace_back(objectKey);
|
||||
attr->Set("checksum", checksum);
|
||||
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName, attr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1425,10 +1433,12 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
|
|||
String objectKey = GetObjectIdentifier(object);
|
||||
|
||||
m_Rcon->FireAndForgetQueries({
|
||||
{"HDEL", m_PrefixConfigObject + typeName, objectKey},
|
||||
{"DEL", m_PrefixStateObject + typeName + ":" + objectKey},
|
||||
{"PUBLISH", "icinga:config:delete:" + typeName, objectKey}
|
||||
}, Prio::Config);
|
||||
{"HDEL", m_PrefixConfigObject + typeName, objectKey},
|
||||
{
|
||||
"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*",
|
||||
"redis_key", m_PrefixConfigObject + typeName, "id", objectKey, "runtime_type", "delete"
|
||||
}
|
||||
}, Prio::Config);
|
||||
|
||||
auto checkable (dynamic_pointer_cast<Checkable>(object));
|
||||
|
||||
|
|
|
@ -212,3 +212,18 @@ String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj)
|
|||
long long IcingaDB::TimestampToMilliseconds(double timestamp) {
|
||||
return static_cast<long long>(timestamp * 1000);
|
||||
}
|
||||
|
||||
String IcingaDB::IcingaToStreamValue(const Value& value)
|
||||
{
|
||||
switch (value.GetType()) {
|
||||
case ValueBoolean:
|
||||
return Convert::ToString((unsigned short)value);
|
||||
case ValueString:
|
||||
return Utility::ValidateUTF8(value);
|
||||
case ValueNumber:
|
||||
case ValueEmpty:
|
||||
return Convert::ToString(value);
|
||||
default:
|
||||
return JsonEncode(value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,13 +61,15 @@ private:
|
|||
std::vector<String> GetTypeOverwriteKeys(const String& type);
|
||||
std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type);
|
||||
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
|
||||
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
|
||||
std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
|
||||
void UpdateState(const Checkable::Ptr& checkable);
|
||||
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
|
||||
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets,
|
||||
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
|
||||
std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
|
||||
void SendConfigDelete(const ConfigObject::Ptr& object);
|
||||
void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
|
||||
void AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey,
|
||||
const String& redisKey, const Dictionary::Ptr& data);
|
||||
|
||||
void SendSentNotification(
|
||||
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
|
||||
|
@ -93,6 +95,7 @@ private:
|
|||
static String FormatCheckSumBinary(const String& str);
|
||||
static String FormatCommandLine(const Value& commandLine);
|
||||
static long long TimestampToMilliseconds(double timestamp);
|
||||
static String IcingaToStreamValue(const Value& value);
|
||||
|
||||
static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object);
|
||||
static String GetObjectIdentifier(const ConfigObject::Ptr& object);
|
||||
|
|
Loading…
Reference in New Issue