diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index ecae7c201..e407373fa 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -273,11 +273,6 @@ void IcingaDB::UpdateAllConfigObjects() upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { std::map> hMSets; - // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded) - std::vector states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; - // Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded) - std::vector statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; - std::vector > transaction = {{"MULTI"}}; std::vector hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; auto skimObjects ([&]() { @@ -317,9 +312,11 @@ void IcingaDB::UpdateAllConfigObjects() String objectKey = GetObjectIdentifier(object); Dictionary::Ptr state = SerializeState(dynamic_pointer_cast(object)); + auto& states = hMSets[m_PrefixConfigObject + lcType + ":state"]; states.emplace_back(objectKey); states.emplace_back(JsonEncode(state)); + auto& statesChksms = hMSets[m_PrefixConfigCheckSum + lcType + ":state"]; statesChksms.emplace_back(objectKey); statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}}))); } @@ -328,27 +325,9 @@ void IcingaDB::UpdateAllConfigObjects() if (!(bulkCounter % 100)) { skimObjects(); - for (auto& kv : hMSets) { - if (!kv.second.empty()) { - kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); - transaction.emplace_back(std::move(kv.second)); - } - } - - if (states.size() > 2) { - transaction.emplace_back(std::move(states)); - transaction.emplace_back(std::move(statesChksms)); - states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; - statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; - } + ExecuteRedisTransaction(rcon, hMSets, {}); hMSets = decltype(hMSets)(); - - if (transaction.size() > 1) { - transaction.push_back({"EXEC"}); - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); - transaction = {{"MULTI"}}; - } } auto checkable (dynamic_pointer_cast(object)); @@ -371,22 +350,7 @@ void IcingaDB::UpdateAllConfigObjects() skimObjects(); - for (auto& kv : hMSets) { - if (!kv.second.empty()) { - kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); - transaction.emplace_back(std::move(kv.second)); - } - } - - if (states.size() > 2) { - transaction.emplace_back(std::move(states)); - transaction.emplace_back(std::move(statesChksms)); - } - - if (transaction.size() > 1) { - transaction.push_back({"EXEC"}); - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); - } + ExecuteRedisTransaction(rcon, hMSets, {}); for (auto zAdds : {&hostZAdds, &serviceZAdds}) { if (zAdds->size() > 2u) { @@ -1472,34 +1436,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile); } - std::vector > transaction = {{"MULTI"}}; - - for (auto& kv : hMSets) { - if (!kv.second.empty()) { - kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); - transaction.emplace_back(std::move(kv.second)); - } - } - - for (auto& objectAttributes : runtimeUpdates) { - std::vector xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); - ObjectLock olock(objectAttributes); - - for (const Dictionary::Pair& kv : objectAttributes) { - String value = IcingaToStreamValue(kv.second); - if (!value.IsEmpty()) { - xAdd.emplace_back(kv.first); - xAdd.emplace_back(value); - } - } - - transaction.emplace_back(std::move(xAdd)); - } - - if (transaction.size() > 1) { - transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1}); - } + ExecuteRedisTransaction(m_Rcon, hMSets, runtimeUpdates); if (checkable) { SendNextUpdate(checkable); @@ -3296,3 +3233,52 @@ void IcingaDB::AddDataToHmSets(std::map& hMSets, query->emplace_back(id); query->emplace_back(JsonEncode(data)); } + +/** + * Execute the provided HMSET values and runtime updates in a single Redis transaction on the provided Redis connection. + * + * The HMSETs should just contain the necessary key value pairs to be set in Redis, i.e, without the HMSET command + * itself. This function will then go through each of the map keys and prepend the HMSET command when transforming the + * map into valid Redis queries. Likewise, the runtime updates should just contain the key value pairs to be streamed + * to the icinga:runtime pipeline, and this function will generate a XADD query for each one of the vector elements. + * + * @param rcon The Redis connection to execute the transaction on. + * @param hMSets A map of Redis keys and their respective HMSET values. + * @param runtimeUpdates A list of dictionaries to be sent to the icinga:runtime stream. + */ +void IcingaDB::ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::map& hMSets, + const std::vector& runtimeUpdates) +{ + RedisConnection::Queries transaction{{"MULTI"}}; + for (auto& [redisKey, query] : hMSets) { + if (!query.empty()) { + query.insert(query.begin(), {"HSET", redisKey}); + transaction.emplace_back(std::move(query)); + } + } + + for (auto& attrs : runtimeUpdates) { + RedisConnection::Query xAdd{"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}; + + ObjectLock olock(attrs); + for (auto& [key, value] : attrs) { + if (auto streamVal(IcingaToStreamValue(value)); !streamVal.IsEmpty()) { + xAdd.emplace_back(key); + xAdd.emplace_back(std::move(streamVal)); + } + } + + transaction.emplace_back(std::move(xAdd)); + } + + if (transaction.size() > 1) { + transaction.emplace_back(RedisConnection::Query{"EXEC"}); + if (!runtimeUpdates.empty()) { + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1}); + } else { + // This is likely triggered by the initial Redis config dump, so a) we don't need to record the number of + // affected objects and b) we don't really know how many objects are going to be affected by this tx. + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + } + } +} diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 73ee4e8ae..d8f7843a5 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -210,6 +210,9 @@ private: static void CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); static void CustomVarsChangedHandler(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::map& hMSets, + const std::vector& runtimeUpdates); + void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp);