RedisWriter#CreateConfigUpdate(): don't require prepared commands

This commit is contained in:
Alexander A. Klimov 2019-06-28 09:51:13 +02:00 committed by Michael Friedrich
parent 1d126b66e9
commit 07823c4b90
2 changed files with 55 additions and 59 deletions

View File

@ -120,21 +120,19 @@ void RedisWriter::UpdateAllConfigObjects()
};
DeleteKeys(globalKeys);
upq.ParallelFor(types, [this, &globalKeys](const TypePair& type) {
upq.ParallelFor(types, [this](const TypePair& type) {
String lcType = type.second;
std::vector<String> keys = GetTypeObjectKeys(lcType);
DeleteKeys(keys);
keys.insert(keys.end(), globalKeys.begin(), globalKeys.end());
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
WorkQueue upqObjectType(25000, Configuration::Concurrency);
upqObjectType.SetName("RedisWriter:ConfigDump:" + lcType);
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType, &keys](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String> > statements = GenerateHmsetStatements(keys);
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> statements;
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
@ -155,16 +153,19 @@ void RedisWriter::UpdateAllConfigObjects()
bulkCounter++;
if (!bulkCounter % 100) {
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
for (auto& kv : statements) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
if (states.size() > 2) {
transaction.push_back(std::move(states));
states = {"HMSET", m_PrefixStateObject + lcType};
}
statements = GenerateHmsetStatements(keys);
statements = decltype(statements)();
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
@ -174,9 +175,12 @@ void RedisWriter::UpdateAllConfigObjects()
}
}
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
for (auto& kv : statements) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
if (states.size() > 2)
transaction.push_back(std::move(states));
@ -251,16 +255,6 @@ void RedisWriter::DeleteKeys(const std::vector<String>& keys) {
m_Rcon->ExecuteQuery(query);
}
std::map<String, std::vector<String> > RedisWriter::GenerateHmsetStatements(const std::vector<String> keys)
{
std::map<String, std::vector<String> > statements;
for (auto& key : keys) {
statements.emplace(key, std::vector<String>({"HMSET", key}));
}
return std::move(statements);
}
std::vector<String> RedisWriter::GetTypeObjectKeys(const String& type)
{
std::vector<String> keys = {
@ -313,9 +307,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (customVarObject) {
auto vars(SerializeVars(customVarObject));
if (vars) {
auto& typeCvs (statements.at(m_PrefixConfigObject + typeName + ":customvar"));
auto& allCvs (statements.at(m_PrefixConfigObject + "customvar"));
auto& cvChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":customvar"));
auto& typeCvs (statements[m_PrefixConfigObject + typeName + ":customvar"]);
auto& allCvs (statements[m_PrefixConfigObject + "customvar"]);
auto& cvChksms (statements[m_PrefixConfigCheckSum + typeName + ":customvar"]);
cvChksms.emplace_back(objectKey);
cvChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumVars(customVarObject)}})));
@ -344,17 +338,17 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
String notesUrl = checkable->GetNotesUrl();
String iconImage = checkable->GetIconImage();
if (!actionUrl.IsEmpty()) {
auto& actionUrls (statements.at(m_PrefixConfigObject + "action_url"));
auto& actionUrls (statements[m_PrefixConfigObject + "action_url"]);
actionUrls.emplace_back(CalculateCheckSumArray(new Array({envId, actionUrl})));
actionUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"action_url", actionUrl}})));
}
if (!notesUrl.IsEmpty()) {
auto& notesUrls (statements.at(m_PrefixConfigObject + "notes_url"));
auto& notesUrls (statements[m_PrefixConfigObject + "notes_url"]);
notesUrls.emplace_back(CalculateCheckSumArray(new Array({envId, notesUrl})));
notesUrls.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"notes_url", notesUrl}})));
}
if (!iconImage.IsEmpty()) {
auto& iconImages (statements.at(m_PrefixConfigObject + "icon_image"));
auto& iconImages (statements[m_PrefixConfigObject + "icon_image"]);
iconImages.emplace_back(CalculateCheckSumArray(new Array({envId, iconImage})));
iconImages.emplace_back(JsonEncode(new Dictionary({{"env_id", envId}, {"icon_image", iconImage}})));
}
@ -383,8 +377,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
groupIds->Add(GetObjectIdentifier((*getGroup)(group)));
}
auto& members (statements.at(m_PrefixConfigObject + typeName + ":groupmember"));
auto& memberChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":groupmember"));
auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
memberChksms.emplace_back(objectKey);
memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}})));
@ -402,9 +396,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (ranges) {
ObjectLock rangesLock(ranges);
Array::Ptr rangeIds(new Array);
auto& typeRanges (statements.at(m_PrefixConfigObject + typeName + ":range"));
auto& allRanges (statements.at(m_PrefixConfigObject + "timerange"));
auto& rangeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":range"));
auto& typeRanges (statements[m_PrefixConfigObject + typeName + ":range"]);
auto& allRanges (statements[m_PrefixConfigObject + "timerange"]);
auto& rangeChksms (statements[m_PrefixConfigCheckSum + typeName + ":range"]);
rangeIds->Reserve(ranges->GetLength());
@ -438,8 +432,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get<String>())));
}
auto& includs (statements.at(m_PrefixConfigObject + typeName + ":overwrite:include"));
auto& includeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":overwrite:include"));
auto& includs (statements[m_PrefixConfigObject + typeName + ":overwrite:include"]);
auto& includeChksms (statements[m_PrefixConfigCheckSum + typeName + ":overwrite:include"]);
includeChksms.emplace_back(objectKey);
includeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(includes)}})));
@ -463,8 +457,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get<String>())));
}
auto& excluds (statements.at(m_PrefixConfigObject + typeName + ":overwrite:exclude"));
auto& excludeChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":overwrite:exclude"));
auto& excluds (statements[m_PrefixConfigObject + typeName + ":overwrite:exclude"]);
auto& excludeChksms (statements[m_PrefixConfigCheckSum + typeName + ":overwrite:exclude"]);
excludeChksms.emplace_back(objectKey);
excludeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(excludes)}})));
@ -486,8 +480,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
parents->Add(GetObjectIdentifier(parent));
}
auto& parnts (statements.at(m_PrefixConfigObject + typeName + ":parent"));
auto& parentChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":parent"));
auto& parnts (statements[m_PrefixConfigObject + typeName + ":parent"]);
auto& parentChksms (statements[m_PrefixConfigCheckSum + typeName + ":parent"]);
parentChksms.emplace_back(objectKey);
parentChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(zone->GetAllParents())}})));
@ -516,8 +510,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
groupIds->Add(GetObjectIdentifier((*getGroup)(group)));
}
auto& members (statements.at(m_PrefixConfigObject + typeName + ":groupmember"));
auto& memberChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":groupmember"));
auto& members (statements[m_PrefixConfigObject + typeName + ":groupmember"]);
auto& memberChksms (statements[m_PrefixConfigCheckSum + typeName + ":groupmember"]);
memberChksms.emplace_back(objectKey);
memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}})));
@ -543,8 +537,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
userIds->Add(GetObjectIdentifier(user));
}
auto& usrs (statements.at(m_PrefixConfigObject + typeName + ":user"));
auto& userChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":user"));
auto& usrs (statements[m_PrefixConfigObject + typeName + ":user"]);
auto& userChksms (statements[m_PrefixConfigCheckSum + typeName + ":user"]);
userChksms.emplace_back(objectKey);
userChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(userIds)}})));
@ -557,8 +551,8 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
usergroupIds->Add(GetObjectIdentifier(usergroup));
}
auto& groups (statements.at(m_PrefixConfigObject + typeName + ":usergroup"));
auto& groupChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":usergroup"));
auto& groups (statements[m_PrefixConfigObject + typeName + ":usergroup"]);
auto& groupChksms (statements[m_PrefixConfigCheckSum + typeName + ":usergroup"]);
groupChksms.emplace_back(objectKey);
groupChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(usergroupIds)}})));
@ -575,9 +569,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (arguments) {
ObjectLock argumentsLock(arguments);
Array::Ptr argumentIds(new Array);
auto& typeArgs (statements.at(m_PrefixConfigObject + typeName + ":argument"));
auto& allArgs (statements.at(m_PrefixConfigObject + "commandargument"));
auto& argChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":argument"));
auto& typeArgs (statements[m_PrefixConfigObject + typeName + ":argument"]);
auto& allArgs (statements[m_PrefixConfigObject + "commandargument"]);
auto& argChksms (statements[m_PrefixConfigCheckSum + typeName + ":argument"]);
argumentIds->Reserve(arguments->GetLength());
@ -599,9 +593,9 @@ void RedisWriter::InsertObjectDependencies(const ConfigObject::Ptr& object, cons
if (envvars) {
ObjectLock envvarsLock(envvars);
Array::Ptr envvarIds(new Array);
auto& typeVars (statements.at(m_PrefixConfigObject + typeName + ":envvar"));
auto& allVars (statements.at(m_PrefixConfigObject + "commandenvvar"));
auto& varChksms (statements.at(m_PrefixConfigCheckSum + typeName + ":envvar"));
auto& typeVars (statements[m_PrefixConfigObject + typeName + ":envvar"]);
auto& allVars (statements[m_PrefixConfigObject + "commandenvvar"]);
auto& varChksms (statements[m_PrefixConfigCheckSum + typeName + ":envvar"]);
envvarIds->Reserve(envvars->GetLength());
@ -638,9 +632,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
String typeName = GetLowerCaseTypeNameDB(object);
std::vector<String> keys = GetTypeObjectKeys(typeName);
std::map<String, std::vector<String> > statements = GenerateHmsetStatements(keys);
std::map<String, std::vector<String>> statements;
std::vector<String> states = {"HMSET", m_PrefixStateObject + typeName};
CreateConfigUpdate(object, typeName, statements, runtimeUpdate);
@ -651,8 +643,13 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
}
std::vector<std::vector<String> > transaction = {{"MULTI"}};
for (const auto& kv : statements)
transaction.push_back(kv.second);
for (auto& kv : statements) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
@ -926,8 +923,8 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty
InsertObjectDependencies(object, typeName, statements);
String objectKey = GetObjectIdentifier(object);
auto& attrs (statements.at(m_PrefixConfigObject + typeName));
auto& chksms (statements.at(m_PrefixConfigCheckSum + typeName));
auto& attrs (statements[m_PrefixConfigObject + typeName]);
auto& chksms (statements[m_PrefixConfigCheckSum + typeName]);
attrs.emplace_back(objectKey);
attrs.emplace_back(JsonEncode(attr));

View File

@ -72,7 +72,6 @@ private:
void UpdateAllConfigObjects();
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys);
std::map<String, std::vector<String> > GenerateHmsetStatements(const std::vector<String> keys);
std::vector<String> GetTypeObjectKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String> >& statements);
void UpdateState(const Checkable::Ptr& checkable);