mirror of https://github.com/Icinga/icinga2.git
Merge pull request #8650 from Icinga/feature/icingadb-delta
Icinga DB: re-insert only changed object attributes
This commit is contained in:
commit
a4cf81194a
|
@ -32,6 +32,7 @@
|
|||
#include <iterator>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <utility>
|
||||
|
||||
|
@ -181,20 +182,66 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
upq.ParallelFor(types, [this](const TypePair& type) {
|
||||
String lcType = type.second;
|
||||
|
||||
std::vector<String> keys = GetTypeObjectKeys(lcType);
|
||||
std::vector<String> keys = GetTypeOverwriteKeys(lcType);
|
||||
DeleteKeys(keys, Prio::Config);
|
||||
|
||||
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
|
||||
|
||||
WorkQueue upqObjectType(25000, Configuration::Concurrency);
|
||||
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
|
||||
|
||||
upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) {
|
||||
std::map<String, String> redisCheckSums;
|
||||
String configCheckSum = m_PrefixConfigCheckSum + lcType;
|
||||
|
||||
upqObjectType.Enqueue([this, &configCheckSum, &redisCheckSums]() {
|
||||
String cursor = "0";
|
||||
|
||||
do {
|
||||
Array::Ptr res = m_Rcon->GetResultOfQuery({
|
||||
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
|
||||
}, Prio::Config);
|
||||
|
||||
Array::Ptr kvs = res->Get(1);
|
||||
Value* key = nullptr;
|
||||
ObjectLock oLock (kvs);
|
||||
|
||||
for (auto& kv : kvs) {
|
||||
if (key) {
|
||||
redisCheckSums.emplace(std::move(*key), std::move(kv));
|
||||
key = nullptr;
|
||||
} else {
|
||||
key = &kv;
|
||||
}
|
||||
}
|
||||
|
||||
cursor = res->Get(0);
|
||||
} while (cursor != "0");
|
||||
});
|
||||
|
||||
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
|
||||
String configObject = m_PrefixConfigObject + lcType;
|
||||
|
||||
// Skimmed away attributes and checksums HMSETs' keys and values by Redis key.
|
||||
std::map<String, std::vector<std::vector<String>>> ourContentRaw {{configCheckSum, {}}, {configObject, {}}};
|
||||
std::mutex ourContentMutex;
|
||||
|
||||
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
|
||||
std::map<String, std::vector<String>> hMSets, publishes;
|
||||
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType};
|
||||
std::vector<std::vector<String> > transaction = {{"MULTI"}};
|
||||
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
|
||||
|
||||
auto skimObjects ([&]() {
|
||||
std::lock_guard<std::mutex> l (ourContentMutex);
|
||||
|
||||
for (auto& kv : ourContentRaw) {
|
||||
auto pos (hMSets.find(kv.first));
|
||||
|
||||
if (pos != hMSets.end()) {
|
||||
kv.second.emplace_back(std::move(pos->second));
|
||||
hMSets.erase(pos);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
bool dumpState = (lcType == "host" || lcType == "service");
|
||||
|
||||
size_t bulkCounter = 0;
|
||||
|
@ -212,6 +259,8 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
|
||||
bulkCounter++;
|
||||
if (!(bulkCounter % 100)) {
|
||||
skimObjects();
|
||||
|
||||
for (auto& kv : hMSets) {
|
||||
if (!kv.second.empty()) {
|
||||
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
|
||||
|
@ -265,6 +314,8 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
}
|
||||
}
|
||||
|
||||
skimObjects();
|
||||
|
||||
for (auto& kv : hMSets) {
|
||||
if (!kv.second.empty()) {
|
||||
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
|
||||
|
@ -313,6 +364,127 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
}
|
||||
}
|
||||
|
||||
std::map<String, std::map<String, String>> ourContent;
|
||||
|
||||
for (auto& source : ourContentRaw) {
|
||||
upqObjectType.Enqueue([&]() {
|
||||
auto& dest (ourContent[source.first]);
|
||||
|
||||
for (auto& hMSet : source.second) {
|
||||
for (decltype(hMSet.size()) i = 0, stop = hMSet.size() - 1u; i < stop; i += 2u) {
|
||||
dest.emplace(std::move(hMSet[i]), std::move(hMSet[i + 1u]));
|
||||
}
|
||||
|
||||
hMSet.clear();
|
||||
}
|
||||
|
||||
source.second.clear();
|
||||
});
|
||||
}
|
||||
|
||||
upqObjectType.Join();
|
||||
ourContentRaw.clear();
|
||||
|
||||
auto& ourCheckSums (ourContent[configCheckSum]);
|
||||
auto& ourObjects (ourContent[configObject]);
|
||||
std::vector<String> setChecksum, setObject, delChecksum, delObject;
|
||||
|
||||
auto redisCurrent (redisCheckSums.begin());
|
||||
auto redisEnd (redisCheckSums.end());
|
||||
auto ourCurrent (ourCheckSums.begin());
|
||||
auto ourEnd (ourCheckSums.end());
|
||||
|
||||
auto flushSets ([&]() {
|
||||
setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum});
|
||||
setObject.insert(setObject.begin(), {"HMSET", configObject});
|
||||
|
||||
std::vector<std::vector<String>> transaction;
|
||||
|
||||
transaction.emplace_back(std::vector<String>{"MULTI"});
|
||||
transaction.emplace_back(std::move(setChecksum));
|
||||
transaction.emplace_back(std::move(setObject));
|
||||
transaction.emplace_back(std::vector<String>{"EXEC"});
|
||||
|
||||
setChecksum.clear();
|
||||
setObject.clear();
|
||||
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
});
|
||||
|
||||
auto flushDels ([&]() {
|
||||
delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum});
|
||||
delObject.insert(delObject.begin(), {"HDEL", configObject});
|
||||
|
||||
std::vector<std::vector<String>> transaction;
|
||||
|
||||
transaction.emplace_back(std::vector<String>{"MULTI"});
|
||||
transaction.emplace_back(std::move(delChecksum));
|
||||
transaction.emplace_back(std::move(delObject));
|
||||
transaction.emplace_back(std::vector<String>{"EXEC"});
|
||||
|
||||
delChecksum.clear();
|
||||
delObject.clear();
|
||||
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
});
|
||||
|
||||
auto setOne ([&]() {
|
||||
setChecksum.emplace_back(ourCurrent->first);
|
||||
setChecksum.emplace_back(ourCurrent->second);
|
||||
setObject.emplace_back(ourCurrent->first);
|
||||
setObject.emplace_back(ourObjects[ourCurrent->first]);
|
||||
|
||||
if (setChecksum.size() == 100u) {
|
||||
flushSets();
|
||||
}
|
||||
});
|
||||
|
||||
auto delOne ([&]() {
|
||||
delChecksum.emplace_back(redisCurrent->first);
|
||||
delObject.emplace_back(redisCurrent->first);
|
||||
|
||||
if (delChecksum.size() == 100u) {
|
||||
flushDels();
|
||||
}
|
||||
});
|
||||
|
||||
for (;;) {
|
||||
if (redisCurrent == redisEnd) {
|
||||
for (; ourCurrent != ourEnd; ++ourCurrent) {
|
||||
setOne();
|
||||
}
|
||||
|
||||
break;
|
||||
} else if (ourCurrent == ourEnd) {
|
||||
for (; redisCurrent != redisEnd; ++redisCurrent) {
|
||||
delOne();
|
||||
}
|
||||
|
||||
break;
|
||||
} else if (redisCurrent->first < ourCurrent->first) {
|
||||
delOne();
|
||||
++redisCurrent;
|
||||
} else if (redisCurrent->first > ourCurrent->first) {
|
||||
setOne();
|
||||
++ourCurrent;
|
||||
} else {
|
||||
if (redisCurrent->second != ourCurrent->second) {
|
||||
setOne();
|
||||
}
|
||||
|
||||
++redisCurrent;
|
||||
++ourCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
if (delChecksum.size()) {
|
||||
flushDels();
|
||||
}
|
||||
|
||||
if (setChecksum.size()) {
|
||||
flushSets();
|
||||
}
|
||||
|
||||
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config);
|
||||
});
|
||||
|
||||
|
@ -371,11 +543,9 @@ void IcingaDB::DeleteKeys(const std::vector<String>& keys, RedisConnection::Quer
|
|||
m_Rcon->FireAndForgetQuery(std::move(query), priority);
|
||||
}
|
||||
|
||||
std::vector<String> IcingaDB::GetTypeObjectKeys(const String& type)
|
||||
std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
|
||||
{
|
||||
std::vector<String> keys = {
|
||||
m_PrefixConfigObject + type,
|
||||
m_PrefixConfigCheckSum + type,
|
||||
m_PrefixConfigObject + type + ":customvar",
|
||||
};
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ private:
|
|||
void UpdateAllConfigObjects();
|
||||
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
|
||||
void DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority);
|
||||
std::vector<String> GetTypeObjectKeys(const String& type);
|
||||
std::vector<String> GetTypeOverwriteKeys(const String& type);
|
||||
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
|
||||
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
|
||||
void UpdateState(const Checkable::Ptr& checkable);
|
||||
|
|
Loading…
Reference in New Issue