From 13295acb721f2acdc0b725cff64258815ac44426 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 19 Jun 2019 15:40:20 +0200 Subject: [PATCH] RedisWriter: Write objects of same type in parallel --- lib/redis/rediswriter-objects.cpp | 129 +++++++++++++++++++----------- lib/redis/rediswriter.hpp | 3 +- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/lib/redis/rediswriter-objects.cpp b/lib/redis/rediswriter-objects.cpp index a0d5a3626..9ff4c95a1 100644 --- a/lib/redis/rediswriter-objects.cpp +++ b/lib/redis/rediswriter-objects.cpp @@ -127,62 +127,80 @@ void RedisWriter::UpdateAllConfigObjects() keys.reserve(globalKeys.size()); keys.insert(keys.end(), globalKeys.begin(), globalKeys.end()); - std::map > statements = GenerateHmsetStatements(keys); - std::vector states = {"HMSET", m_PrefixStateObject + lcType}; - std::vector > transaction = {{"MULTI"}}; + std::vector objectChunks = ChunkObjects(type.first->GetObjects(), 500); - bool dumpState = (lcType == "host" || lcType == "service"); + WorkQueue upqObjectType(25000, Configuration::Concurrency); + upqObjectType.SetName("RedisWriter:ConfigDump:" + lcType); - size_t bulkCounter = 0; - for (const ConfigObject::Ptr& object : type.first->GetObjects()) { - if (lcType != GetLowerCaseTypeNameDB(object)) - continue; + upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType, &keys](const Array::Ptr chunk) { + ObjectLock chunkLock(chunk); + std::map > statements = GenerateHmsetStatements(keys); + std::vector states = {"HMSET", m_PrefixStateObject + lcType}; + std::vector > transaction = {{"MULTI"}}; - CreateConfigUpdate(object, lcType, statements, false); + bool dumpState = (lcType == "host" || lcType == "service"); - // Write out inital state for checkables - if (dumpState) { - states.emplace_back(GetObjectIdentifier(object)); - states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast(object)))); - } + size_t bulkCounter = 0; + for (const ConfigObject::Ptr& object : chunk) { + if (lcType != GetLowerCaseTypeNameDB(object)) + continue; - bulkCounter++; - if (!bulkCounter % 100) { - for (const auto& kv : statements) - if (kv.second.size() > 2) - transaction.push_back(kv.second); + CreateConfigUpdate(object, lcType, statements, false); - if (states.size() > 2) { - transaction.push_back(std::move(states)); - states = {"HMSET", m_PrefixStateObject + lcType}; + // Write out inital state for checkables + if (dumpState) { + states.emplace_back(GetObjectIdentifier(object)); + states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast(object)))); } - statements = GenerateHmsetStatements(keys); + bulkCounter++; + if (!bulkCounter % 100) { + for (const auto& kv : statements) + if (kv.second.size() > 2) + transaction.push_back(kv.second); - if (transaction.size() > 1) { - transaction.push_back({"EXEC"}); - m_Rcon->ExecuteQueries(transaction); - transaction = {{"MULTI"}}; + if (states.size() > 2) { + transaction.push_back(std::move(states)); + states = {"HMSET", m_PrefixStateObject + lcType}; + } + + statements = GenerateHmsetStatements(keys); + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->ExecuteQueries(transaction); + transaction = {{"MULTI"}}; + } + } + } + + for (const auto& kv : statements) + if (kv.second.size() > 2) + transaction.push_back(kv.second); + + if (states.size() > 2) + transaction.push_back(std::move(states)); + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->ExecuteQueries(transaction); + } + + m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType}); + + Log(LogNotice, "RedisWriter") + << "Dumped " << bulkCounter << " objects of type " << type.second; + }); + + upqObjectType.Join(); + + if (upqObjectType.HasExceptions()) { + for (boost::exception_ptr exc : upqObjectType.GetExceptions()) { + if (exc) { + boost::rethrow_exception(exc); } } } - - for (const auto& kv : statements) - if (kv.second.size() > 2) - transaction.push_back(kv.second); - - if (states.size() > 2) - transaction.push_back(std::move(states)); - - if (transaction.size() > 1) { - transaction.push_back({"EXEC"}); - m_Rcon->ExecuteQueries(transaction); - } - - m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType}); - - Log(LogNotice, "RedisWriter") - << "Dumped " << bulkCounter << " objects of type " << type.second; }); upq.Join(); @@ -204,6 +222,27 @@ void RedisWriter::UpdateAllConfigObjects() << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; } +std::vector RedisWriter::ChunkObjects(std::vector > objects, size_t chunkSize) { + std::vector chunks; + Array::Ptr currentChunk(new Array); + size_t currentChunkSize = 0; + for (auto object : objects) { + currentChunk->Add(object); + currentChunkSize++; + if (currentChunkSize >= chunkSize) { + chunks.push_back(currentChunk); + currentChunk = new Array(); + currentChunkSize = 0; + } + } + + if (currentChunkSize > 0) { + chunks.push_back(currentChunk); + } + + return chunks; +} + void RedisWriter::DeleteKeys(const std::vector& keys) { std::vector query = {"DEL"}; for (auto& key : keys) { @@ -213,7 +252,7 @@ void RedisWriter::DeleteKeys(const std::vector& keys) { m_Rcon->ExecuteQuery(query); } -std::map > RedisWriter::GenerateHmsetStatements(const std::vector& keys) +std::map > RedisWriter::GenerateHmsetStatements(const std::vector keys) { std::map > statements; for (auto& key : keys) { diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 5c3123446..56ed9ce33 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -70,8 +70,9 @@ private: /* config & status dump */ void UpdateAllConfigObjects(); + std::vector ChunkObjects(std::vector > objects, size_t chunkSize); void DeleteKeys(const std::vector& keys); - std::map > GenerateHmsetStatements(const std::vector& keys); + std::map > GenerateHmsetStatements(const std::vector keys); std::vector GetTypeObjectKeys(const String& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map >& statements); void UpdateState(const Checkable::Ptr& checkable);