RedisWriter: Write objects of same type in parallel

This commit is contained in:
Noah Hilverling 2019-06-19 15:40:20 +02:00 committed by Michael Friedrich
parent 68c88b3edf
commit 13295acb72
2 changed files with 86 additions and 46 deletions

View File

@ -127,62 +127,80 @@ void RedisWriter::UpdateAllConfigObjects()
keys.reserve(globalKeys.size());
keys.insert(keys.end(), globalKeys.begin(), globalKeys.end());
std::map<String, std::vector<String> > statements = GenerateHmsetStatements(keys);
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
std::vector<Array::Ptr> objectChunks = ChunkObjects(type.first->GetObjects(), 500);
bool dumpState = (lcType == "host" || lcType == "service");
WorkQueue upqObjectType(25000, Configuration::Concurrency);
upqObjectType.SetName("RedisWriter:ConfigDump:" + lcType);
size_t bulkCounter = 0;
for (const ConfigObject::Ptr& object : type.first->GetObjects()) {
if (lcType != GetLowerCaseTypeNameDB(object))
continue;
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType, &keys](const Array::Ptr chunk) {
ObjectLock chunkLock(chunk);
std::map<String, std::vector<String> > statements = GenerateHmsetStatements(keys);
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
CreateConfigUpdate(object, lcType, statements, false);
bool dumpState = (lcType == "host" || lcType == "service");
// Write out inital state for checkables
if (dumpState) {
states.emplace_back(GetObjectIdentifier(object));
states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast<Checkable>(object))));
}
size_t bulkCounter = 0;
for (const ConfigObject::Ptr& object : chunk) {
if (lcType != GetLowerCaseTypeNameDB(object))
continue;
bulkCounter++;
if (!bulkCounter % 100) {
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
CreateConfigUpdate(object, lcType, statements, false);
if (states.size() > 2) {
transaction.push_back(std::move(states));
states = {"HMSET", m_PrefixStateObject + lcType};
// Write out inital state for checkables
if (dumpState) {
states.emplace_back(GetObjectIdentifier(object));
states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast<Checkable>(object))));
}
statements = GenerateHmsetStatements(keys);
bulkCounter++;
if (!bulkCounter % 100) {
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(transaction);
transaction = {{"MULTI"}};
if (states.size() > 2) {
transaction.push_back(std::move(states));
states = {"HMSET", m_PrefixStateObject + lcType};
}
statements = GenerateHmsetStatements(keys);
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(transaction);
transaction = {{"MULTI"}};
}
}
}
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
if (states.size() > 2)
transaction.push_back(std::move(states));
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(transaction);
}
m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType});
Log(LogNotice, "RedisWriter")
<< "Dumped " << bulkCounter << " objects of type " << type.second;
});
upqObjectType.Join();
if (upqObjectType.HasExceptions()) {
for (boost::exception_ptr exc : upqObjectType.GetExceptions()) {
if (exc) {
boost::rethrow_exception(exc);
}
}
}
for (const auto& kv : statements)
if (kv.second.size() > 2)
transaction.push_back(kv.second);
if (states.size() > 2)
transaction.push_back(std::move(states));
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(transaction);
}
m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType});
Log(LogNotice, "RedisWriter")
<< "Dumped " << bulkCounter << " objects of type " << type.second;
});
upq.Join();
@ -204,6 +222,27 @@ void RedisWriter::UpdateAllConfigObjects()
<< "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
}
std::vector<Array::Ptr> RedisWriter::ChunkObjects(std::vector<intrusive_ptr<ConfigObject> > objects, size_t chunkSize) {
std::vector<Array::Ptr> chunks;
Array::Ptr currentChunk(new Array);
size_t currentChunkSize = 0;
for (auto object : objects) {
currentChunk->Add(object);
currentChunkSize++;
if (currentChunkSize >= chunkSize) {
chunks.push_back(currentChunk);
currentChunk = new Array();
currentChunkSize = 0;
}
}
if (currentChunkSize > 0) {
chunks.push_back(currentChunk);
}
return chunks;
}
void RedisWriter::DeleteKeys(const std::vector<String>& keys) {
std::vector<String> query = {"DEL"};
for (auto& key : keys) {
@ -213,7 +252,7 @@ void RedisWriter::DeleteKeys(const std::vector<String>& keys) {
m_Rcon->ExecuteQuery(query);
}
std::map<String, std::vector<String> > RedisWriter::GenerateHmsetStatements(const std::vector<String>& keys)
std::map<String, std::vector<String> > RedisWriter::GenerateHmsetStatements(const std::vector<String> keys)
{
std::map<String, std::vector<String> > statements;
for (auto& key : keys) {

View File

@ -70,8 +70,9 @@ private:
/* config & status dump */
void UpdateAllConfigObjects();
std::vector<Array::Ptr> ChunkObjects(std::vector<intrusive_ptr<ConfigObject> > objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys);
std::map<String, std::vector<String> > GenerateHmsetStatements(const std::vector<String>& keys);
std::map<String, std::vector<String> > GenerateHmsetStatements(const std::vector<String> keys);
std::vector<String> GetTypeObjectKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String> >& statements);
void UpdateState(const Checkable::Ptr& checkable);