IcingaDB: Introduce ExecuteRedisTransaction() helper method

This commit is contained in:
Yonas Habteab 2025-02-24 13:11:12 +01:00
parent db3f8dec27
commit aed1bb6294
2 changed files with 57 additions and 68 deletions

View File

@ -273,11 +273,6 @@ void IcingaDB::UpdateAllConfigObjects()
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> hMSets;
// Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded)
std::vector<String> states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
// Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded)
std::vector<String> statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
auto skimObjects ([&]() {
@ -317,9 +312,11 @@ void IcingaDB::UpdateAllConfigObjects()
String objectKey = GetObjectIdentifier(object);
Dictionary::Ptr state = SerializeState(dynamic_pointer_cast<Checkable>(object));
auto& states = hMSets[m_PrefixConfigObject + lcType + ":state"];
states.emplace_back(objectKey);
states.emplace_back(JsonEncode(state));
auto& statesChksms = hMSets[m_PrefixConfigCheckSum + lcType + ":state"];
statesChksms.emplace_back(objectKey);
statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}})));
}
@ -328,27 +325,9 @@ void IcingaDB::UpdateAllConfigObjects()
if (!(bulkCounter % 100)) {
skimObjects();
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
if (states.size() > 2) {
transaction.emplace_back(std::move(states));
transaction.emplace_back(std::move(statesChksms));
states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
}
ExecuteRedisTransaction(rcon, hMSets, {});
hMSets = decltype(hMSets)();
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
transaction = {{"MULTI"}};
}
}
auto checkable (dynamic_pointer_cast<Checkable>(object));
@ -371,22 +350,7 @@ void IcingaDB::UpdateAllConfigObjects()
skimObjects();
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
if (states.size() > 2) {
transaction.emplace_back(std::move(states));
transaction.emplace_back(std::move(statesChksms));
}
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
}
ExecuteRedisTransaction(rcon, hMSets, {});
for (auto zAdds : {&hostZAdds, &serviceZAdds}) {
if (zAdds->size() > 2u) {
@ -1472,34 +1436,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
}
std::vector<std::vector<String> > transaction = {{"MULTI"}};
for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}
for (auto& objectAttributes : runtimeUpdates) {
std::vector<String> xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
ObjectLock olock(objectAttributes);
for (const Dictionary::Pair& kv : objectAttributes) {
String value = IcingaToStreamValue(kv.second);
if (!value.IsEmpty()) {
xAdd.emplace_back(kv.first);
xAdd.emplace_back(value);
}
}
transaction.emplace_back(std::move(xAdd));
}
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
}
ExecuteRedisTransaction(m_Rcon, hMSets, runtimeUpdates);
if (checkable) {
SendNextUpdate(checkable);
@ -3296,3 +3233,52 @@ void IcingaDB::AddDataToHmSets(std::map<String, RedisConnection::Query>& hMSets,
query->emplace_back(id);
query->emplace_back(JsonEncode(data));
}
/**
* Execute the provided HMSET values and runtime updates in a single Redis transaction on the provided Redis connection.
*
* The HMSETs should just contain the necessary key value pairs to be set in Redis, i.e, without the HMSET command
* itself. This function will then go through each of the map keys and prepend the HMSET command when transforming the
* map into valid Redis queries. Likewise, the runtime updates should just contain the key value pairs to be streamed
* to the icinga:runtime pipeline, and this function will generate a XADD query for each one of the vector elements.
*
* @param rcon The Redis connection to execute the transaction on.
* @param hMSets A map of Redis keys and their respective HMSET values.
* @param runtimeUpdates A list of dictionaries to be sent to the icinga:runtime stream.
*/
void IcingaDB::ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::map<String, RedisConnection::Query>& hMSets,
const std::vector<Dictionary::Ptr>& runtimeUpdates)
{
RedisConnection::Queries transaction{{"MULTI"}};
for (auto& [redisKey, query] : hMSets) {
if (!query.empty()) {
query.insert(query.begin(), {"HSET", redisKey});
transaction.emplace_back(std::move(query));
}
}
for (auto& attrs : runtimeUpdates) {
RedisConnection::Query xAdd{"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"};
ObjectLock olock(attrs);
for (auto& [key, value] : attrs) {
if (auto streamVal(IcingaToStreamValue(value)); !streamVal.IsEmpty()) {
xAdd.emplace_back(key);
xAdd.emplace_back(std::move(streamVal));
}
}
transaction.emplace_back(std::move(xAdd));
}
if (transaction.size() > 1) {
transaction.emplace_back(RedisConnection::Query{"EXEC"});
if (!runtimeUpdates.empty()) {
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
} else {
// This is likely triggered by the initial Redis config dump, so a) we don't need to record the number of
// affected objects and b) we don't really know how many objects are going to be affected by this tx.
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
}
}
}

View File

@ -210,6 +210,9 @@ private:
static void CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
static void CustomVarsChangedHandler(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
static void ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::map<String, RedisConnection::Query>& hMSets,
const std::vector<Dictionary::Ptr>& runtimeUpdates);
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);