From e391c0e7b5b37954ceef513065c3ca837ab2763c Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 27 Nov 2019 11:08:22 +0100 Subject: [PATCH] RedisConnection: introduce extensible priorities --- lib/icingadb/icingadb-objects.cpp | 46 ++++++++++++++-------------- lib/icingadb/icingadb.cpp | 4 ++- lib/icingadb/icingadb.hpp | 2 +- lib/icingadb/redisconnection.cpp | 50 ++++++++++++++----------------- lib/icingadb/redisconnection.hpp | 19 ++++++++---- 5 files changed, 65 insertions(+), 56 deletions(-) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index a1a0ec1b5..1971f8030 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -36,6 +36,8 @@ using namespace icinga; +using Prio = RedisConnection::QueryPriority; + static const char * const l_LuaResetDump = R"EOF( local id = redis.call('XADD', KEYS[1], '*', 'type', '*', 'state', 'wip') @@ -110,7 +112,7 @@ void IcingaDB::UpdateAllConfigObjects() types.emplace_back(ctype, lcType); } - m_Rcon->FireAndForgetQuery({"EVAL", l_LuaResetDump, "1", "icinga:dump"}); + m_Rcon->FireAndForgetQuery({"EVAL", l_LuaResetDump, "1", "icinga:dump"}, Prio::Config); const std::vector globalKeys = { m_PrefixConfigObject + "customvar", @@ -118,13 +120,13 @@ void IcingaDB::UpdateAllConfigObjects() m_PrefixConfigObject + "notes_url", m_PrefixConfigObject + "icon_image", }; - DeleteKeys(globalKeys); + DeleteKeys(globalKeys, Prio::Config); upq.ParallelFor(types, [this](const TypePair& type) { String lcType = type.second; std::vector keys = GetTypeObjectKeys(lcType); - DeleteKeys(keys); + DeleteKeys(keys, Prio::Config); auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); @@ -183,7 +185,7 @@ void IcingaDB::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); transaction = {{"MULTI"}}; } } @@ -214,7 +216,7 @@ void IcingaDB::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); } Log(LogNotice, "IcingaDB") @@ -231,7 +233,7 @@ void IcingaDB::UpdateAllConfigObjects() } } - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}); + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config); }); upq.Join(); @@ -249,7 +251,7 @@ void IcingaDB::UpdateAllConfigObjects() } } - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", "*", "state", "done"}); + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", "*", "state", "done"}, Prio::Config); Log(LogInformation, "IcingaDB") << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; @@ -275,13 +277,13 @@ std::vector>> IcingaDB::ChunkObjects(std return std::move(chunks); } -void IcingaDB::DeleteKeys(const std::vector& keys) { +void IcingaDB::DeleteKeys(const std::vector& keys, RedisConnection::QueryPriority priority) { std::vector query = {"DEL"}; for (auto& key : keys) { query.emplace_back(key); } - m_Rcon->FireAndForgetQuery(std::move(query)); + m_Rcon->FireAndForgetQuery(std::move(query), priority); } std::vector IcingaDB::GetTypeObjectKeys(const String& type) @@ -707,7 +709,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable) { Dictionary::Ptr stateAttrs = SerializeState(checkable); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State); } // Used to update a single object, used for runtime updates @@ -725,7 +727,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd Checkable::Ptr checkable = dynamic_pointer_cast(object); if (checkable) { String objectKey = GetObjectIdentifier(object); - m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State); publishes["icinga:config:update"].emplace_back("state:" + typeName + ":" + objectKey); } @@ -753,7 +755,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); } } @@ -1073,7 +1075,7 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) {"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} - }); + }, Prio::Config); } static inline @@ -1117,7 +1119,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); } - m_Rcon->FireAndForgetQuery(std::move(streamadd)); + m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); int hard_state; if (!cr) { @@ -1178,7 +1180,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } void IcingaDB::SendSentNotification( @@ -1235,7 +1237,7 @@ void IcingaDB::SendSentNotification( xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); for (const User::Ptr& user : users) { auto userId = GetObjectIdentifier(user); @@ -1247,7 +1249,7 @@ void IcingaDB::SendSentNotification( "user_id", GetObjectIdentifier(user), }); - m_Rcon->FireAndForgetQuery(std::move(xAddUser)); + m_Rcon->FireAndForgetQuery(std::move(xAddUser), Prio::History); } } @@ -1317,7 +1319,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) @@ -1389,7 +1391,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } void IcingaDB::SendAddedComment(const Comment::Ptr& comment) @@ -1444,7 +1446,7 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) @@ -1509,7 +1511,7 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) } } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value) @@ -1551,7 +1553,7 @@ void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value& xAdd.emplace_back(GetObjectIdentifier(endpoint)); } - m_Rcon->FireAndForgetQuery(std::move(xAdd)); + m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History); } Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 2e46b054b..eece6c7dd 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -15,6 +15,8 @@ using namespace icinga; #define MAX_EVENTS_DEFAULT 5000 +using Prio = RedisConnection::QueryPriority; + REGISTER_TYPE(IcingaDB); IcingaDB::IcingaDB() @@ -120,7 +122,7 @@ void IcingaDB::PublishStats() status->Set("config_dump_in_progress", m_ConfigDumpInProgress); String jsonStats = JsonEncode(status); - m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, true); + m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, Prio::Heartbeat); } void IcingaDB::HandleEvents() diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 1a8b166c4..8cca4e390 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -45,7 +45,7 @@ private: /* config & status dump */ void UpdateAllConfigObjects(); std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); - void DeleteKeys(const std::vector& keys); + void DeleteKeys(const std::vector& keys, RedisConnection::QueryPriority priority); std::vector GetTypeObjectKeys(const String& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, std::map>& publishes, bool runtimeUpdate); diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index d7ef162af..bd94273d9 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -68,7 +68,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg) } } -void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool highPrio) +void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) { { Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); @@ -77,13 +77,13 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool high auto item (std::make_shared(std::move(query))); - asio::post(m_Strand, [this, item, highPrio]() { - (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{item, nullptr, nullptr, nullptr}); + asio::post(m_Strand, [this, item, priority]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr}); m_QueuedWrites.Set(); }); } -void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, bool highPrio) +void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) { for (auto& query : queries) { Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); @@ -92,13 +92,13 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, boo auto item (std::make_shared(std::move(queries))); - asio::post(m_Strand, [this, item, highPrio]() { - (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, item, nullptr, nullptr}); + asio::post(m_Strand, [this, item, priority]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr}); m_QueuedWrites.Set(); }); } -RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, bool highPrio) +RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) { { Log msg (LogNotice, "IcingaDB", "Executing query:"); @@ -109,8 +109,8 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query auto future (promise.get_future()); auto item (std::make_shared(std::move(query), std::move(promise))); - asio::post(m_Strand, [this, item, highPrio]() { - (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, item, nullptr}); + asio::post(m_Strand, [this, item, priority]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr}); m_QueuedWrites.Set(); }); @@ -119,7 +119,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query return future.get(); } -RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, bool highPrio) +RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) { for (auto& query : queries) { Log msg (LogNotice, "IcingaDB", "Executing query:"); @@ -130,8 +130,8 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q auto future (promise.get_future()); auto item (std::make_shared(std::move(queries), std::move(promise))); - asio::post(m_Strand, [this, item, highPrio]() { - (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, nullptr, item}); + asio::post(m_Strand, [this, item, priority]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item}); m_QueuedWrites.Set(); }); @@ -267,22 +267,18 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) for (;;) { m_QueuedWrites.Wait(yc); - for (;;) { - if (m_Queues.HighPrioWrites.empty()) { - if (m_Queues.Writes.empty()) { - break; - } else { - auto next (std::move(m_Queues.Writes.front())); - m_Queues.Writes.pop(); - - WriteItem(yc, std::move(next)); - } - } else { - auto next (std::move(m_Queues.HighPrioWrites.front())); - m_Queues.HighPrioWrites.pop(); - - WriteItem(yc, std::move(next)); + WriteFirstOfHighestPrio: + for (auto& queue : m_Queues.Writes) { + if (queue.second.empty()) { + continue; } + + auto next (std::move(queue.second.front())); + queue.second.pop(); + + WriteItem(yc, std::move(next)); + + goto WriteFirstOfHighestPrio; } m_QueuedWrites.Clear(); diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index aaa525a6f..41ec3857f 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -50,6 +51,14 @@ namespace icinga typedef Value Reply; typedef std::vector Replies; + enum class QueryPriority : unsigned char + { + Heartbeat, + Config, + State, + History + }; + RedisConnection(const String& host, const int port, const String& path, const String& password = "", const int db = 0); @@ -57,11 +66,11 @@ namespace icinga bool IsConnected(); - void FireAndForgetQuery(Query query, bool highPrio = false); - void FireAndForgetQueries(Queries queries, bool highPrio = false); + void FireAndForgetQuery(Query query, QueryPriority priority); + void FireAndForgetQueries(Queries queries, QueryPriority priority); - Reply GetResultOfQuery(Query query, bool highPrio = false); - Replies GetResultsOfQueries(Queries queries, bool highPrio = false); + Reply GetResultOfQuery(Query query, QueryPriority priority); + Replies GetResultsOfQueries(Queries queries, QueryPriority priority); private: enum class ResponseAction : unsigned char @@ -128,7 +137,7 @@ namespace icinga Atomic m_Connecting, m_Connected, m_Started; struct { - std::queue Writes, HighPrioWrites; + std::map> Writes; std::queue> ReplyPromises; std::queue> RepliesPromises; std::queue FutureResponseActions;