RedisWriter#InsertObjectDependencies(): PUBLISH icinga:config:update ... for all dependencies

This commit is contained in:
Alexander A. Klimov 2019-09-18 12:04:59 +02:00 committed by Michael Friedrich
parent 322f3fbb0d
commit b0cd306b61
2 changed files with 154 additions and 44 deletions

View File

@ -145,7 +145,7 @@ void RedisWriter::UpdateAllConfigObjects()
upqObjectType.SetName("RedisWriter:ConfigDump:" + lcType);
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> statements;
std::map<String, std::vector<String>> hMSets, publishes;
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
@ -156,7 +156,7 @@ void RedisWriter::UpdateAllConfigObjects()
if (lcType != GetLowerCaseTypeNameDB(object))
continue;
CreateConfigUpdate(object, lcType, statements, false);
CreateConfigUpdate(object, lcType, hMSets, publishes, false);
// Write out inital state for checkables
if (dumpState) {
@ -166,7 +166,7 @@ void RedisWriter::UpdateAllConfigObjects()
bulkCounter++;
if (!(bulkCounter % 100)) {
for (auto& kv : statements) {
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
@ -178,7 +178,21 @@ void RedisWriter::UpdateAllConfigObjects()
states = {"HMSET", m_PrefixStateObject + lcType};
}
statements = decltype(statements)();
for (auto& kv : publishes) {
for (auto& message : kv.second) {
std::vector<String> publish;
publish.reserve(3);
publish.emplace_back("PUBLISH");
publish.emplace_back(kv.first);
publish.emplace_back(std::move(message));
transaction.emplace_back(std::move(publish));
}
}
hMSets = decltype(hMSets)();
publishes = decltype(publishes)();
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
@ -188,7 +202,7 @@ void RedisWriter::UpdateAllConfigObjects()
}
}
for (auto& kv : statements) {
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
@ -198,6 +212,19 @@ void RedisWriter::UpdateAllConfigObjects()
if (states.size() > 2)
transaction.emplace_back(std::move(states));
for (auto& kv : publishes) {
for (auto& message : kv.second) {
std::vector<String> publish;
publish.reserve(3);
publish.emplace_back("PUBLISH");
publish.emplace_back(kv.first);
publish.emplace_back(std::move(message));
transaction.emplace_back(std::move(publish));
}
}
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction));
@ -313,18 +340,20 @@ static ConfigObject::Ptr GetObjectByName(const String& name)
return ConfigObject::GetObject<ConfigType>(name);
}
void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String> >& statements)
void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate)
{
String objectKey = GetObjectIdentifier(object);
CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object);
String envId = CalculateCheckSumString(GetEnvironment());
auto* configUpdates (runtimeUpdate ? &publishes["icinga:config:update"] : nullptr);
if (customVarObject) {
auto vars(SerializeVars(customVarObject));
if (vars) {
auto& typeCvs (statements[m_PrefixConfigObject + typeName + ":customvar"]);
auto& allCvs (statements[m_PrefixConfigObject + "customvar"]);
auto& cvChksms (statements[m_PrefixConfigCheckSum + typeName + ":customvar"]);
auto& typeCvs (hMSets[m_PrefixConfigObject + typeName + ":customvar"]);
auto& allCvs (hMSets[m_PrefixConfigObject + "customvar"]);
auto& cvChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":customvar"]);
cvChksms.emplace_back(objectKey);
cvChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumVars(customVarObject)}})));
@ -337,9 +366,18 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
for (auto& kv : vars) {
allCvs.emplace_back(kv.first);
allCvs.emplace_back(JsonEncode(kv.second));
if (configUpdates) {
configUpdates->emplace_back("customvar:" + kv.first);
}
String id = CalculateCheckSumArray(new Array({envId, kv.first, objectKey}));
typeCvs.emplace_back(id);
typeCvs.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"env_id", envId}, {"customvar_id", kv.first}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":customvar:" + id);
}
}
}
}
@ -352,19 +390,31 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
String notesUrl = checkable->GetNotesUrl();
String iconImage = checkable->GetIconImage();
if (!actionUrl.IsEmpty()) {
auto& actionUrls (statements[m_PrefixConfigObject + "action_url"]);
auto& actionUrls (hMSets[m_PrefixConfigObject + "action_url"]);
actionUrls.emplace_back(CalculateCheckSumArray(new Array({envId, actionUrl})));
actionUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"action_url", actionUrl}})));
if (configUpdates) {
configUpdates->emplace_back("action_url:" + actionUrls.at(actionUrls.size() - 2u));
}
}
if (!notesUrl.IsEmpty()) {
auto& notesUrls (statements[m_PrefixConfigObject + "notes_url"]);
auto& notesUrls (hMSets[m_PrefixConfigObject + "notes_url"]);
notesUrls.emplace_back(CalculateCheckSumArray(new Array({envId, notesUrl})));
notesUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"notes_url", notesUrl}})));
if (configUpdates) {
configUpdates->emplace_back("notes_url:" + notesUrls.at(notesUrls.size() - 2u));
}
}
if (!iconImage.IsEmpty()) {
auto& iconImages (statements[m_PrefixConfigObject + "icon_image"]);
auto& iconImages (hMSets[m_PrefixConfigObject + "icon_image"]);
iconImages.emplace_back(CalculateCheckSumArray(new Array({envId, iconImage})));
iconImages.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"icon_image", iconImage}})));
if (configUpdates) {
configUpdates->emplace_back("icon_image:" + iconImages.at(iconImages.size() - 2u));
}
}
Host::Ptr host;
@ -387,8 +437,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
groupIds->Reserve(groups->GetLength());
auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
for (auto& group : groups) {
String groupId = GetObjectIdentifier((*getGroup)(group));
@ -396,6 +446,10 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
members.emplace_back(id);
members.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"env_id", envId}, {"group_id", groupId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":groupmember:" + id);
}
groupIds->Add(groupId);
}
@ -413,8 +467,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (ranges) {
ObjectLock rangesLock(ranges);
Array::Ptr rangeIds(new Array);
auto& typeRanges (statements[m_PrefixConfigObject + typeName + ":range"]);
auto& rangeChksms (statements[m_PrefixConfigCheckSum + typeName + ":range"]);
auto& typeRanges (hMSets[m_PrefixConfigObject + typeName + ":range"]);
auto& rangeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":range"]);
rangeIds->Reserve(ranges->GetLength());
@ -425,6 +479,10 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
String id = CalculateCheckSumArray(new Array({envId, rangeId, objectKey}));
typeRanges.emplace_back(id);
typeRanges.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":range:" + id);
}
}
rangeChksms.emplace_back(objectKey);
@ -444,8 +502,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
includeChecksums->Reserve(includes->GetLength());
auto& includs (statements[m_PrefixConfigObject + typeName + ":override:include"]);
auto& includeChksms (statements[m_PrefixConfigCheckSum + typeName + ":override:include"]);
auto& includs (hMSets[m_PrefixConfigObject + typeName + ":override:include"]);
auto& includeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":override:include"]);
for (auto include : includes) {
String includeId = GetObjectIdentifier((*getInclude)(include.Get<String>()));
includeChecksums->Add(includeId);
@ -453,6 +511,10 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
String id = CalculateCheckSumArray(new Array({envId, includeId, objectKey}));
includs.emplace_back(id);
includs.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"timeperiod_id", objectKey}, {"include_id", includeId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":override:include:" + id);
}
}
includeChksms.emplace_back(objectKey);
@ -471,8 +533,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
excludeChecksums->Reserve(excludes->GetLength());
auto& excluds (statements[m_PrefixConfigObject + typeName + ":override:exclude"]);
auto& excludeChksms (statements[m_PrefixConfigCheckSum + typeName + ":override:exclude"]);
auto& excluds (hMSets[m_PrefixConfigObject + typeName + ":override:exclude"]);
auto& excludeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":override:exclude"]);
for (auto exclude : excludes) {
String excludeId = GetObjectIdentifier((*getExclude)(exclude.Get<String>()));
@ -481,6 +543,10 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
String id = CalculateCheckSumArray(new Array({envId, excludeId, objectKey}));
excluds.emplace_back(id);
excluds.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":override:exclude:" + id);
}
}
excludeChksms.emplace_back(objectKey);
@ -497,14 +563,19 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
parents->Reserve(parentsRaw.size());
auto& parnts (statements[m_PrefixConfigObject + typeName + ":parent"]);
auto& parentChksms (statements[m_PrefixConfigCheckSum + typeName + ":parent"]);
auto& parnts (hMSets[m_PrefixConfigObject + typeName + ":parent"]);
auto& parentChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":parent"]);
for (auto& parent : parentsRaw) {
String parentId = GetObjectIdentifier(parent);
String id = CalculateCheckSumArray(new Array({envId, parentId, objectKey}));
parnts.emplace_back(id);
parnts.emplace_back(JsonEncode(new Dictionary({{"zone_id", objectKey}, {"env_id", envId}, {"parent_id", parentId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":parent:" + id);
}
parents->Add(GetObjectIdentifier(parent));
}
@ -529,14 +600,19 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
groupIds->Reserve(groups->GetLength());
auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
for (auto& group : groups) {
String groupId = GetObjectIdentifier((*getGroup)(group));
String id = CalculateCheckSumArray(new Array({envId, groupId, objectKey}));
members.emplace_back(id);
members.emplace_back(JsonEncode(new Dictionary({{"user_id", objectKey}, {"env_id", envId}, {"group_id", groupId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":groupmember:" + id);
}
groupIds->Add(groupId);
}
@ -558,14 +634,19 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
userIds->Reserve(users.size());
auto& usrs (statements[m_PrefixConfigObject + typeName + ":user"]);
auto& userChksms (statements[m_PrefixConfigCheckSum + typeName + ":user"]);
auto& usrs (hMSets[m_PrefixConfigObject + typeName + ":user"]);
auto& userChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":user"]);
for (auto& user : users) {
String userId = GetObjectIdentifier(user);
String id = CalculateCheckSumArray(new Array({envId, userId, objectKey}));
usrs.emplace_back(id);
usrs.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"env_id", envId}, {"user_id", userId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":user:" + id);
}
userIds->Add(userId);
}
@ -574,14 +655,19 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
usergroupIds->Reserve(usergroups.size());
auto& groups (statements[m_PrefixConfigObject + typeName + ":usergroup"]);
auto& groupChksms (statements[m_PrefixConfigCheckSum + typeName + ":usergroup"]);
auto& groups (hMSets[m_PrefixConfigObject + typeName + ":usergroup"]);
auto& groupChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":usergroup"]);
for (auto& usergroup : usergroups) {
String usergroupId = GetObjectIdentifier(usergroup);
String id = CalculateCheckSumArray(new Array({envId, usergroupId, objectKey}));
groups.emplace_back(id);
groups.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"env_id", envId}, {"usergroup_id", usergroupId}})));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":usergroup:" + id);
}
usergroupIds->Add(usergroupId);
}
@ -597,8 +683,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
Dictionary::Ptr arguments = command->GetArguments();
if (arguments) {
ObjectLock argumentsLock(arguments);
auto& typeArgs (statements[m_PrefixConfigObject + typeName + ":argument"]);
auto& argChksms (statements[m_PrefixConfigCheckSum + typeName + ":argument"]);
auto& typeArgs (hMSets[m_PrefixConfigObject + typeName + ":argument"]);
auto& argChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":argument"]);
for (auto& kv : arguments) {
Dictionary::Ptr values;
@ -620,6 +706,11 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
typeArgs.emplace_back(id);
typeArgs.emplace_back(JsonEncode(values));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":argument:" + id);
}
argChksms.emplace_back(id);
argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}})));
}
@ -629,8 +720,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (envvars) {
ObjectLock envvarsLock(envvars);
Array::Ptr envvarIds(new Array);
auto& typeVars (statements[m_PrefixConfigObject + typeName + ":envvar"]);
auto& varChksms (statements[m_PrefixConfigCheckSum + typeName + ":envvar"]);
auto& typeVars (hMSets[m_PrefixConfigObject + typeName + ":envvar"]);
auto& varChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":envvar"]);
envvarIds->Reserve(envvars->GetLength());
@ -654,6 +745,11 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
typeVars.emplace_back(id);
typeVars.emplace_back(JsonEncode(values));
if (configUpdates) {
configUpdates->emplace_back(typeName + ":envvar:" + id);
}
varChksms.emplace_back(id);
varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}})));
}
@ -678,10 +774,10 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
String typeName = GetLowerCaseTypeNameDB(object);
std::map<String, std::vector<String>> statements;
std::map<String, std::vector<String>> hMSets, publishes;
std::vector<String> states = {"HMSET", m_PrefixStateObject + typeName};
CreateConfigUpdate(object, typeName, statements, runtimeUpdate);
CreateConfigUpdate(object, typeName, hMSets, publishes, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) {
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName,
@ -690,13 +786,26 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
std::vector<std::vector<String> > transaction = {{"MULTI"}};
for (auto& kv : statements) {
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
for (auto& kv : publishes) {
for (auto& message : kv.second) {
std::vector<String> publish;
publish.reserve(3);
publish.emplace_back("PUBLISH");
publish.emplace_back(kv.first);
publish.emplace_back(std::move(message));
transaction.emplace_back(std::move(publish));
}
}
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction));
@ -952,8 +1061,8 @@ bool RedisWriter::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr
* icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure.
*/
void
RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String> >& statements,
bool runtimeUpdate)
RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate)
{
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
if (!runtimeUpdate && m_ConfigDumpInProgress)
@ -969,11 +1078,11 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty
if (!PrepareObject(object, attr, chksm))
return;
InsertObjectDependencies(object, typeName, statements);
InsertObjectDependencies(object, typeName, hMSets, publishes, runtimeUpdate);
String objectKey = GetObjectIdentifier(object);
auto& attrs (statements[m_PrefixConfigObject + typeName]);
auto& chksms (statements[m_PrefixConfigCheckSum + typeName]);
auto& attrs (hMSets[m_PrefixConfigObject + typeName]);
auto& chksms (hMSets[m_PrefixConfigCheckSum + typeName]);
attrs.emplace_back(objectKey);
attrs.emplace_back(JsonEncode(attr));
@ -983,7 +1092,7 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty
/* Send an update event to subscribers. */
if (runtimeUpdate) {
m_Rcon->FireAndForgetQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey});
publishes["icinga:config:update"].emplace_back(typeName + ":" + objectKey);
}
}

View File

@ -73,11 +73,12 @@ private:
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys);
std::vector<String> GetTypeObjectKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String> >& statements);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void UpdateState(const Checkable::Ptr& checkable);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String> >& statements,
bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object);
std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);