Icinga DB: re-insert only changed object attributes

This commit is contained in:
Alexander A. Klimov 2021-03-04 17:29:39 +01:00
parent aa4f53009e
commit 4eddac0e64
2 changed files with 178 additions and 8 deletions

View File

@ -32,6 +32,7 @@
#include <iterator> #include <iterator>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex>
#include <set> #include <set>
#include <utility> #include <utility>
@ -158,20 +159,66 @@ void IcingaDB::UpdateAllConfigObjects()
upq.ParallelFor(types, [this](const TypePair& type) { upq.ParallelFor(types, [this](const TypePair& type) {
String lcType = type.second; String lcType = type.second;
std::vector<String> keys = GetTypeObjectKeys(lcType); std::vector<String> keys = GetTypeOverwriteKeys(lcType);
DeleteKeys(keys, Prio::Config); DeleteKeys(keys, Prio::Config);
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
WorkQueue upqObjectType(25000, Configuration::Concurrency); WorkQueue upqObjectType(25000, Configuration::Concurrency);
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) { std::map<String, String> redisCheckSums;
String configCheckSum = m_PrefixConfigCheckSum + lcType;
upqObjectType.Enqueue([this, &configCheckSum, &redisCheckSums]() {
String cursor = "0";
do {
Array::Ptr res = m_Rcon->GetResultOfQuery({
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
}, Prio::Config);
Array::Ptr kvs = res->Get(1);
Value* key = nullptr;
ObjectLock oLock (kvs);
for (auto& kv : kvs) {
if (key) {
redisCheckSums.emplace(std::move(*key), std::move(kv));
key = nullptr;
} else {
key = &kv;
}
}
cursor = res->Get(0);
} while (cursor != "0");
});
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
String configObject = m_PrefixConfigObject + lcType;
// Skimmed away attributes and checksums HMSETs' keys and values by Redis key.
std::map<String, std::vector<std::vector<String>>> ourContentRaw {{configCheckSum, {}}, {configObject, {}}};
std::mutex ourContentMutex;
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> hMSets, publishes; std::map<String, std::vector<String>> hMSets, publishes;
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType}; std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
std::vector<std::vector<String> > transaction = {{"MULTI"}}; std::vector<std::vector<String> > transaction = {{"MULTI"}};
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
auto skimObjects ([&]() {
std::lock_guard<std::mutex> l (ourContentMutex);
for (auto& kv : ourContentRaw) {
auto pos (hMSets.find(kv.first));
if (pos != hMSets.end()) {
kv.second.emplace_back(std::move(pos->second));
hMSets.erase(pos);
}
}
});
bool dumpState = (lcType == "host" || lcType == "service"); bool dumpState = (lcType == "host" || lcType == "service");
size_t bulkCounter = 0; size_t bulkCounter = 0;
@ -189,6 +236,8 @@ void IcingaDB::UpdateAllConfigObjects()
bulkCounter++; bulkCounter++;
if (!(bulkCounter % 100)) { if (!(bulkCounter % 100)) {
skimObjects();
for (auto& kv : hMSets) { for (auto& kv : hMSets) {
if (!kv.second.empty()) { if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
@ -242,6 +291,8 @@ void IcingaDB::UpdateAllConfigObjects()
} }
} }
skimObjects();
for (auto& kv : hMSets) { for (auto& kv : hMSets) {
if (!kv.second.empty()) { if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
@ -290,6 +341,127 @@ void IcingaDB::UpdateAllConfigObjects()
} }
} }
std::map<String, std::map<String, String>> ourContent;
for (auto& source : ourContentRaw) {
upqObjectType.Enqueue([&]() {
auto& dest (ourContent[source.first]);
for (auto& hMSet : source.second) {
for (decltype(hMSet.size()) i = 0, stop = hMSet.size() - 1u; i < stop; i += 2u) {
dest.emplace(std::move(hMSet[i]), std::move(hMSet[i + 1u]));
}
hMSet.clear();
}
source.second.clear();
});
}
upqObjectType.Join();
ourContentRaw.clear();
auto& ourCheckSums (ourContent[configCheckSum]);
auto& ourObjects (ourContent[configObject]);
std::vector<String> setChecksum, setObject, delChecksum, delObject;
auto redisCurrent (redisCheckSums.begin());
auto redisEnd (redisCheckSums.end());
auto ourCurrent (ourCheckSums.begin());
auto ourEnd (ourCheckSums.end());
auto flushSets ([&]() {
setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum});
setObject.insert(setObject.begin(), {"HMSET", configObject});
std::vector<std::vector<String>> transaction;
transaction.emplace_back(std::vector<String>{"MULTI"});
transaction.emplace_back(std::move(setChecksum));
transaction.emplace_back(std::move(setObject));
transaction.emplace_back(std::vector<String>{"EXEC"});
setChecksum.clear();
setObject.clear();
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
});
auto flushDels ([&]() {
delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum});
delObject.insert(delObject.begin(), {"HDEL", configObject});
std::vector<std::vector<String>> transaction;
transaction.emplace_back(std::vector<String>{"MULTI"});
transaction.emplace_back(std::move(delChecksum));
transaction.emplace_back(std::move(delObject));
transaction.emplace_back(std::vector<String>{"EXEC"});
delChecksum.clear();
delObject.clear();
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
});
auto setOne ([&]() {
setChecksum.emplace_back(ourCurrent->first);
setChecksum.emplace_back(ourCurrent->second);
setObject.emplace_back(ourCurrent->first);
setObject.emplace_back(ourObjects[ourCurrent->first]);
if (setChecksum.size() == 100u) {
flushSets();
}
});
auto delOne ([&]() {
delChecksum.emplace_back(redisCurrent->first);
delObject.emplace_back(redisCurrent->first);
if (delChecksum.size() == 100u) {
flushDels();
}
});
for (;;) {
if (redisCurrent == redisEnd) {
for (; ourCurrent != ourEnd; ++ourCurrent) {
setOne();
}
break;
} else if (ourCurrent == ourEnd) {
for (; redisCurrent != redisEnd; ++redisCurrent) {
delOne();
}
break;
} else if (redisCurrent->first < ourCurrent->first) {
delOne();
++redisCurrent;
} else if (redisCurrent->first > ourCurrent->first) {
setOne();
++ourCurrent;
} else {
if (redisCurrent->second != ourCurrent->second) {
setOne();
}
++redisCurrent;
++ourCurrent;
}
}
if (delChecksum.size()) {
flushDels();
}
if (setChecksum.size()) {
flushSets();
}
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config); m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config);
}); });
@ -348,11 +520,9 @@ void IcingaDB::DeleteKeys(const std::vector<String>& keys, RedisConnection::Quer
m_Rcon->FireAndForgetQuery(std::move(query), priority); m_Rcon->FireAndForgetQuery(std::move(query), priority);
} }
std::vector<String> IcingaDB::GetTypeObjectKeys(const String& type) std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
{ {
std::vector<String> keys = { std::vector<String> keys = {
m_PrefixConfigObject + type,
m_PrefixConfigCheckSum + type,
m_PrefixConfigObject + type + ":customvar", m_PrefixConfigObject + type + ":customvar",
}; };

View File

@ -44,7 +44,7 @@ private:
void UpdateAllConfigObjects(); void UpdateAllConfigObjects();
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize); std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority); void DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority);
std::vector<String> GetTypeObjectKeys(const String& type); std::vector<String> GetTypeOverwriteKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate); std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void UpdateState(const Checkable::Ptr& checkable); void UpdateState(const Checkable::Ptr& checkable);