diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 8ff1ae554..d85e788a6 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -158,20 +159,66 @@ void IcingaDB::UpdateAllConfigObjects() upq.ParallelFor(types, [this](const TypePair& type) { String lcType = type.second; - std::vector keys = GetTypeObjectKeys(lcType); + std::vector keys = GetTypeOverwriteKeys(lcType); DeleteKeys(keys, Prio::Config); - auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); - WorkQueue upqObjectType(25000, Configuration::Concurrency); upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); - upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) { + std::map redisCheckSums; + String configCheckSum = m_PrefixConfigCheckSum + lcType; + + upqObjectType.Enqueue([this, &configCheckSum, &redisCheckSums]() { + String cursor = "0"; + + do { + Array::Ptr res = m_Rcon->GetResultOfQuery({ + "HSCAN", configCheckSum, cursor, "COUNT", "1000" + }, Prio::Config); + + Array::Ptr kvs = res->Get(1); + Value* key = nullptr; + ObjectLock oLock (kvs); + + for (auto& kv : kvs) { + if (key) { + redisCheckSums.emplace(std::move(*key), std::move(kv)); + key = nullptr; + } else { + key = &kv; + } + } + + cursor = res->Get(0); + } while (cursor != "0"); + }); + + auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); + String configObject = m_PrefixConfigObject + lcType; + + // Skimmed away attributes and checksums HMSETs' keys and values by Redis key. + std::map>> ourContentRaw {{configCheckSum, {}}, {configObject, {}}}; + std::mutex ourContentMutex; + + upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { std::map> hMSets, publishes; std::vector states = {"HMSET", m_PrefixStateObject + lcType}; std::vector > transaction = {{"MULTI"}}; std::vector hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; + auto skimObjects ([&]() { + std::lock_guard l (ourContentMutex); + + for (auto& kv : ourContentRaw) { + auto pos (hMSets.find(kv.first)); + + if (pos != hMSets.end()) { + kv.second.emplace_back(std::move(pos->second)); + hMSets.erase(pos); + } + } + }); + bool dumpState = (lcType == "host" || lcType == "service"); size_t bulkCounter = 0; @@ -189,6 +236,8 @@ void IcingaDB::UpdateAllConfigObjects() bulkCounter++; if (!(bulkCounter % 100)) { + skimObjects(); + for (auto& kv : hMSets) { if (!kv.second.empty()) { kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); @@ -242,6 +291,8 @@ void IcingaDB::UpdateAllConfigObjects() } } + skimObjects(); + for (auto& kv : hMSets) { if (!kv.second.empty()) { kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); @@ -290,6 +341,127 @@ void IcingaDB::UpdateAllConfigObjects() } } + std::map> ourContent; + + for (auto& source : ourContentRaw) { + upqObjectType.Enqueue([&]() { + auto& dest (ourContent[source.first]); + + for (auto& hMSet : source.second) { + for (decltype(hMSet.size()) i = 0, stop = hMSet.size() - 1u; i < stop; i += 2u) { + dest.emplace(std::move(hMSet[i]), std::move(hMSet[i + 1u])); + } + + hMSet.clear(); + } + + source.second.clear(); + }); + } + + upqObjectType.Join(); + ourContentRaw.clear(); + + auto& ourCheckSums (ourContent[configCheckSum]); + auto& ourObjects (ourContent[configObject]); + std::vector setChecksum, setObject, delChecksum, delObject; + + auto redisCurrent (redisCheckSums.begin()); + auto redisEnd (redisCheckSums.end()); + auto ourCurrent (ourCheckSums.begin()); + auto ourEnd (ourCheckSums.end()); + + auto flushSets ([&]() { + setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum}); + setObject.insert(setObject.begin(), {"HMSET", configObject}); + + std::vector> transaction; + + transaction.emplace_back(std::vector{"MULTI"}); + transaction.emplace_back(std::move(setChecksum)); + transaction.emplace_back(std::move(setObject)); + transaction.emplace_back(std::vector{"EXEC"}); + + setChecksum.clear(); + setObject.clear(); + + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + }); + + auto flushDels ([&]() { + delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum}); + delObject.insert(delObject.begin(), {"HDEL", configObject}); + + std::vector> transaction; + + transaction.emplace_back(std::vector{"MULTI"}); + transaction.emplace_back(std::move(delChecksum)); + transaction.emplace_back(std::move(delObject)); + transaction.emplace_back(std::vector{"EXEC"}); + + delChecksum.clear(); + delObject.clear(); + + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + }); + + auto setOne ([&]() { + setChecksum.emplace_back(ourCurrent->first); + setChecksum.emplace_back(ourCurrent->second); + setObject.emplace_back(ourCurrent->first); + setObject.emplace_back(ourObjects[ourCurrent->first]); + + if (setChecksum.size() == 100u) { + flushSets(); + } + }); + + auto delOne ([&]() { + delChecksum.emplace_back(redisCurrent->first); + delObject.emplace_back(redisCurrent->first); + + if (delChecksum.size() == 100u) { + flushDels(); + } + }); + + for (;;) { + if (redisCurrent == redisEnd) { + for (; ourCurrent != ourEnd; ++ourCurrent) { + setOne(); + } + + break; + } else if (ourCurrent == ourEnd) { + for (; redisCurrent != redisEnd; ++redisCurrent) { + delOne(); + } + + break; + } else if (redisCurrent->first < ourCurrent->first) { + delOne(); + ++redisCurrent; + } else if (redisCurrent->first > ourCurrent->first) { + setOne(); + ++ourCurrent; + } else { + if (redisCurrent->second != ourCurrent->second) { + setOne(); + } + + ++redisCurrent; + ++ourCurrent; + } + } + + if (delChecksum.size()) { + flushDels(); + } + + if (setChecksum.size()) { + flushSets(); + } + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config); }); @@ -348,11 +520,9 @@ void IcingaDB::DeleteKeys(const std::vector& keys, RedisConnection::Quer m_Rcon->FireAndForgetQuery(std::move(query), priority); } -std::vector IcingaDB::GetTypeObjectKeys(const String& type) +std::vector IcingaDB::GetTypeOverwriteKeys(const String& type) { std::vector keys = { - m_PrefixConfigObject + type, - m_PrefixConfigCheckSum + type, m_PrefixConfigObject + type + ":customvar", }; diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index b7edb6f0c..29cb82932 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -44,7 +44,7 @@ private: void UpdateAllConfigObjects(); std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); void DeleteKeys(const std::vector& keys, RedisConnection::QueryPriority priority); - std::vector GetTypeObjectKeys(const String& type); + std::vector GetTypeOverwriteKeys(const String& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, std::map>& publishes, bool runtimeUpdate); void UpdateState(const Checkable::Ptr& checkable);