From 07823c4b905164ab95852ef5ab2b15e2fba21fdb Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 Jun 2019 09:51:13 +0200 Subject: [PATCH] RedisWriter#CreateConfigUpdate(): don't require prepared commands --- lib/redis/rediswriter-objects.cpp | 113 +++++++++++++++--------------- lib/redis/rediswriter.hpp | 1 - 2 files changed, 55 insertions(+), 59 deletions(-) diff --git a/lib/redis/rediswriter-objects.cpp b/lib/redis/rediswriter-objects.cpp index 628a58ce8..8199198be 100644 --- a/lib/redis/rediswriter-objects.cpp +++ b/lib/redis/rediswriter-objects.cpp @@ -120,21 +120,19 @@ void RedisWriter::UpdateAllConfigObjects() }; DeleteKeys(globalKeys); - upq.ParallelFor(types, [this, &globalKeys](const TypePair& type) { + upq.ParallelFor(types, [this](const TypePair& type) { String lcType = type.second; std::vector keys = GetTypeObjectKeys(lcType); DeleteKeys(keys); - keys.insert(keys.end(), globalKeys.begin(), globalKeys.end()); - auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); WorkQueue upqObjectType(25000, Configuration::Concurrency); upqObjectType.SetName("RedisWriter:ConfigDump:" + lcType); - upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType, &keys](decltype(objectChunks)::const_reference chunk) { - std::map > statements = GenerateHmsetStatements(keys); + upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) { + std::map> statements; std::vector states = {"HMSET", m_PrefixStateObject + lcType}; std::vector > transaction = {{"MULTI"}}; @@ -155,16 +153,19 @@ void RedisWriter::UpdateAllConfigObjects() bulkCounter++; if (!bulkCounter % 100) { - for (const auto& kv : statements) - if (kv.second.size() > 2) - transaction.push_back(kv.second); + for (auto& kv : statements) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } if (states.size() > 2) { transaction.push_back(std::move(states)); states = {"HMSET", m_PrefixStateObject + lcType}; } - statements = GenerateHmsetStatements(keys); + statements = decltype(statements)(); if (transaction.size() > 1) { transaction.push_back({"EXEC"}); @@ -174,9 +175,12 @@ void RedisWriter::UpdateAllConfigObjects() } } - for (const auto& kv : statements) - if (kv.second.size() > 2) - transaction.push_back(kv.second); + for (auto& kv : statements) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } if (states.size() > 2) transaction.push_back(std::move(states)); @@ -251,16 +255,6 @@ void RedisWriter::DeleteKeys(const std::vector& keys) { m_Rcon->ExecuteQuery(query); } -std::map > RedisWriter::GenerateHmsetStatements(const std::vector keys) -{ - std::map > statements; - for (auto& key : keys) { - statements.emplace(key, std::vector({"HMSET", key})); - } - - return std::move(statements); -} - std::vector RedisWriter::GetTypeObjectKeys(const String& type) { std::vector keys = { @@ -313,9 +307,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons if (customVarObject) { auto vars(SerializeVars(customVarObject)); if (vars) { - auto& typeCvs (statements.at(m_PrefixConfigObject + typeName + ":customvar")); - auto& allCvs (statements.at(m_PrefixConfigObject + "customvar")); - auto& cvChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":customvar")); + auto& typeCvs (statements[m_PrefixConfigObject + typeName + ":customvar"]); + auto& allCvs (statements[m_PrefixConfigObject + "customvar"]); + auto& cvChksms (statements[m_PrefixConfigCheckSum + typeName + ":customvar"]); cvChksms.emplace_back(objectKey); cvChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumVars(customVarObject)}}))); @@ -344,17 +338,17 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons String notesUrl = checkable->GetNotesUrl(); String iconImage = checkable->GetIconImage(); if (!actionUrl.IsEmpty()) { - auto& actionUrls (statements.at(m_PrefixConfigObject + "action_url")); + auto& actionUrls (statements[m_PrefixConfigObject + "action_url"]); actionUrls.emplace_back(CalculateCheckSumArray(new Array({envId, actionUrl}))); actionUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"action_url", actionUrl}}))); } if (!notesUrl.IsEmpty()) { - auto& notesUrls (statements.at(m_PrefixConfigObject + "notes_url")); + auto& notesUrls (statements[m_PrefixConfigObject + "notes_url"]); notesUrls.emplace_back(CalculateCheckSumArray(new Array({envId, notesUrl}))); notesUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"notes_url", notesUrl}}))); } if (!iconImage.IsEmpty()) { - auto& iconImages (statements.at(m_PrefixConfigObject + "icon_image")); + auto& iconImages (statements[m_PrefixConfigObject + "icon_image"]); iconImages.emplace_back(CalculateCheckSumArray(new Array({envId, iconImage}))); iconImages.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"icon_image", iconImage}}))); } @@ -383,8 +377,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons groupIds->Add(GetObjectIdentifier((*getGroup)(group))); } - auto& members (statements.at(m_PrefixConfigObject + typeName + ":groupmember")); - auto& memberChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":groupmember")); + auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]); memberChksms.emplace_back(objectKey); memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}}))); @@ -402,9 +396,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons if (ranges) { ObjectLock rangesLock(ranges); Array::Ptr rangeIds(new Array); - auto& typeRanges (statements.at(m_PrefixConfigObject + typeName + ":range")); - auto& allRanges (statements.at(m_PrefixConfigObject + "timerange")); - auto& rangeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":range")); + auto& typeRanges (statements[m_PrefixConfigObject + typeName + ":range"]); + auto& allRanges (statements[m_PrefixConfigObject + "timerange"]); + auto& rangeChksms (statements[m_PrefixConfigCheckSum + typeName + ":range"]); rangeIds->Reserve(ranges->GetLength()); @@ -438,8 +432,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get()))); } - auto& includs (statements.at(m_PrefixConfigObject + typeName + ":overwrite:include")); - auto& includeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":overwrite:include")); + auto& includs (statements[m_PrefixConfigObject + typeName + ":overwrite:include"]); + auto& includeChksms (statements[m_PrefixConfigCheckSum + typeName + ":overwrite:include"]); includeChksms.emplace_back(objectKey); includeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(includes)}}))); @@ -463,8 +457,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get()))); } - auto& excluds (statements.at(m_PrefixConfigObject + typeName + ":overwrite:exclude")); - auto& excludeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":overwrite:exclude")); + auto& excluds (statements[m_PrefixConfigObject + typeName + ":overwrite:exclude"]); + auto& excludeChksms (statements[m_PrefixConfigCheckSum + typeName + ":overwrite:exclude"]); excludeChksms.emplace_back(objectKey); excludeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(excludes)}}))); @@ -486,8 +480,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons parents->Add(GetObjectIdentifier(parent)); } - auto& parnts (statements.at(m_PrefixConfigObject + typeName + ":parent")); - auto& parentChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":parent")); + auto& parnts (statements[m_PrefixConfigObject + typeName + ":parent"]); + auto& parentChksms (statements[m_PrefixConfigCheckSum + typeName + ":parent"]); parentChksms.emplace_back(objectKey); parentChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(zone->GetAllParents())}}))); @@ -516,8 +510,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons groupIds->Add(GetObjectIdentifier((*getGroup)(group))); } - auto& members (statements.at(m_PrefixConfigObject + typeName + ":groupmember")); - auto& memberChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":groupmember")); + auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]); memberChksms.emplace_back(objectKey); memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}}))); @@ -543,8 +537,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons userIds->Add(GetObjectIdentifier(user)); } - auto& usrs (statements.at(m_PrefixConfigObject + typeName + ":user")); - auto& userChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":user")); + auto& usrs (statements[m_PrefixConfigObject + typeName + ":user"]); + auto& userChksms (statements[m_PrefixConfigCheckSum + typeName + ":user"]); userChksms.emplace_back(objectKey); userChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(userIds)}}))); @@ -557,8 +551,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons usergroupIds->Add(GetObjectIdentifier(usergroup)); } - auto& groups (statements.at(m_PrefixConfigObject + typeName + ":usergroup")); - auto& groupChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":usergroup")); + auto& groups (statements[m_PrefixConfigObject + typeName + ":usergroup"]); + auto& groupChksms (statements[m_PrefixConfigCheckSum + typeName + ":usergroup"]); groupChksms.emplace_back(objectKey); groupChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(usergroupIds)}}))); @@ -575,9 +569,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons if (arguments) { ObjectLock argumentsLock(arguments); Array::Ptr argumentIds(new Array); - auto& typeArgs (statements.at(m_PrefixConfigObject + typeName + ":argument")); - auto& allArgs (statements.at(m_PrefixConfigObject + "commandargument")); - auto& argChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":argument")); + auto& typeArgs (statements[m_PrefixConfigObject + typeName + ":argument"]); + auto& allArgs (statements[m_PrefixConfigObject + "commandargument"]); + auto& argChksms (statements[m_PrefixConfigCheckSum + typeName + ":argument"]); argumentIds->Reserve(arguments->GetLength()); @@ -599,9 +593,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons if (envvars) { ObjectLock envvarsLock(envvars); Array::Ptr envvarIds(new Array); - auto& typeVars (statements.at(m_PrefixConfigObject + typeName + ":envvar")); - auto& allVars (statements.at(m_PrefixConfigObject + "commandenvvar")); - auto& varChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":envvar")); + auto& typeVars (statements[m_PrefixConfigObject + typeName + ":envvar"]); + auto& allVars (statements[m_PrefixConfigObject + "commandenvvar"]); + auto& varChksms (statements[m_PrefixConfigCheckSum + typeName + ":envvar"]); envvarIds->Reserve(envvars->GetLength()); @@ -638,9 +632,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime String typeName = GetLowerCaseTypeNameDB(object); - std::vector keys = GetTypeObjectKeys(typeName); - - std::map > statements = GenerateHmsetStatements(keys); + std::map> statements; std::vector states = {"HMSET", m_PrefixStateObject + typeName}; CreateConfigUpdate(object, typeName, statements, runtimeUpdate); @@ -651,8 +643,13 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime } std::vector > transaction = {{"MULTI"}}; - for (const auto& kv : statements) - transaction.push_back(kv.second); + + for (auto& kv : statements) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } if (transaction.size() > 1) { transaction.push_back({"EXEC"}); @@ -926,8 +923,8 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty InsertObjectDependencies(object, typeName, statements); String objectKey = GetObjectIdentifier(object); - auto& attrs (statements.at(m_PrefixConfigObject + typeName)); - auto& chksms (statements.at(m_PrefixConfigCheckSum + typeName)); + auto& attrs (statements[m_PrefixConfigObject + typeName]); + auto& chksms (statements[m_PrefixConfigCheckSum + typeName]); attrs.emplace_back(objectKey); attrs.emplace_back(JsonEncode(attr)); diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 26d7fb647..16a9a83ca 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -72,7 +72,6 @@ private: void UpdateAllConfigObjects(); std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); void DeleteKeys(const std::vector& keys); - std::map > GenerateHmsetStatements(const std::vector keys); std::vector GetTypeObjectKeys(const String& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map >& statements); void UpdateState(const Checkable::Ptr& checkable);