Merge pull request #7661 from Icinga/feature/icingadb-prio

RedisConnection: introduce extensible priorities
This commit is contained in:
Noah Hilverling 2019-12-03 07:12:29 +01:00 committed by GitHub
commit bb08f190a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 65 additions and 56 deletions

View File

@ -36,6 +36,8 @@
using namespace icinga; using namespace icinga;
using Prio = RedisConnection::QueryPriority;
static const char * const l_LuaResetDump = R"EOF( static const char * const l_LuaResetDump = R"EOF(
local id = redis.call('XADD', KEYS[1], '*', 'type', '*', 'state', 'wip') local id = redis.call('XADD', KEYS[1], '*', 'type', '*', 'state', 'wip')
@ -110,7 +112,7 @@ void IcingaDB::UpdateAllConfigObjects()
types.emplace_back(ctype, lcType); types.emplace_back(ctype, lcType);
} }
m_Rcon->FireAndForgetQuery({"EVAL", l_LuaResetDump, "1", "icinga:dump"}); m_Rcon->FireAndForgetQuery({"EVAL", l_LuaResetDump, "1", "icinga:dump"}, Prio::Config);
const std::vector<String> globalKeys = { const std::vector<String> globalKeys = {
m_PrefixConfigObject + "customvar", m_PrefixConfigObject + "customvar",
@ -118,13 +120,13 @@ void IcingaDB::UpdateAllConfigObjects()
m_PrefixConfigObject + "notes_url", m_PrefixConfigObject + "notes_url",
m_PrefixConfigObject + "icon_image", m_PrefixConfigObject + "icon_image",
}; };
DeleteKeys(globalKeys); DeleteKeys(globalKeys, Prio::Config);
upq.ParallelFor(types, [this](const TypePair& type) { upq.ParallelFor(types, [this](const TypePair& type) {
String lcType = type.second; String lcType = type.second;
std::vector<String> keys = GetTypeObjectKeys(lcType); std::vector<String> keys = GetTypeObjectKeys(lcType);
DeleteKeys(keys); DeleteKeys(keys, Prio::Config);
auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); auto objectChunks (ChunkObjects(type.first->GetObjects(), 500));
@ -183,7 +185,7 @@ void IcingaDB::UpdateAllConfigObjects()
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
transaction = {{"MULTI"}}; transaction = {{"MULTI"}};
} }
} }
@ -214,7 +216,7 @@ void IcingaDB::UpdateAllConfigObjects()
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
} }
Log(LogNotice, "IcingaDB") Log(LogNotice, "IcingaDB")
@ -231,7 +233,7 @@ void IcingaDB::UpdateAllConfigObjects()
} }
} }
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}); m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}, Prio::Config);
}); });
upq.Join(); upq.Join();
@ -249,7 +251,7 @@ void IcingaDB::UpdateAllConfigObjects()
} }
} }
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", "*", "state", "done"}); m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", "*", "state", "done"}, Prio::Config);
Log(LogInformation, "IcingaDB") Log(LogInformation, "IcingaDB")
<< "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
@ -275,13 +277,13 @@ std::vector<std::vector<intrusive_ptr<ConfigObject>>> IcingaDB::ChunkObjects(std
return std::move(chunks); return std::move(chunks);
} }
void IcingaDB::DeleteKeys(const std::vector<String>& keys) { void IcingaDB::DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority) {
std::vector<String> query = {"DEL"}; std::vector<String> query = {"DEL"};
for (auto& key : keys) { for (auto& key : keys) {
query.emplace_back(key); query.emplace_back(key);
} }
m_Rcon->FireAndForgetQuery(std::move(query)); m_Rcon->FireAndForgetQuery(std::move(query), priority);
} }
std::vector<String> IcingaDB::GetTypeObjectKeys(const String& type) std::vector<String> IcingaDB::GetTypeObjectKeys(const String& type)
@ -707,7 +709,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable)
{ {
Dictionary::Ptr stateAttrs = SerializeState(checkable); Dictionary::Ptr stateAttrs = SerializeState(checkable);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State);
} }
// Used to update a single object, used for runtime updates // Used to update a single object, used for runtime updates
@ -725,7 +727,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) { if (checkable) {
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}); m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State);
publishes["icinga:config:update"].emplace_back("state:" + typeName + ":" + objectKey); publishes["icinga:config:update"].emplace_back("state:" + typeName + ":" + objectKey);
} }
@ -753,7 +755,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
} }
} }
@ -1073,7 +1075,7 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
{"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"HDEL", m_PrefixConfigObject + typeName, objectKey},
{"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, {"DEL", m_PrefixStateObject + typeName + ":" + objectKey},
{"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey}
}); }, Prio::Config);
} }
static inline static inline
@ -1117,7 +1119,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu
streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); streamadd.emplace_back(Utility::ValidateUTF8(kv.second));
} }
m_Rcon->FireAndForgetQuery(std::move(streamadd)); m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State);
int hard_state; int hard_state;
if (!cr) { if (!cr) {
@ -1178,7 +1180,7 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu
xAdd.emplace_back(GetObjectIdentifier(endpoint)); xAdd.emplace_back(GetObjectIdentifier(endpoint));
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
void IcingaDB::SendSentNotification( void IcingaDB::SendSentNotification(
@ -1235,7 +1237,7 @@ void IcingaDB::SendSentNotification(
xAdd.emplace_back(GetObjectIdentifier(endpoint)); xAdd.emplace_back(GetObjectIdentifier(endpoint));
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
for (const User::Ptr& user : users) { for (const User::Ptr& user : users) {
auto userId = GetObjectIdentifier(user); auto userId = GetObjectIdentifier(user);
@ -1247,7 +1249,7 @@ void IcingaDB::SendSentNotification(
"user_id", GetObjectIdentifier(user), "user_id", GetObjectIdentifier(user),
}); });
m_Rcon->FireAndForgetQuery(std::move(xAddUser)); m_Rcon->FireAndForgetQuery(std::move(xAddUser), Prio::History);
} }
} }
@ -1317,7 +1319,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(GetObjectIdentifier(endpoint)); xAdd.emplace_back(GetObjectIdentifier(endpoint));
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
@ -1389,7 +1391,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
xAdd.emplace_back(GetObjectIdentifier(endpoint)); xAdd.emplace_back(GetObjectIdentifier(endpoint));
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
void IcingaDB::SendAddedComment(const Comment::Ptr& comment) void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
@ -1444,7 +1446,7 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
} }
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
@ -1509,7 +1511,7 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
} }
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value) void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value)
@ -1551,7 +1553,7 @@ void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value&
xAdd.emplace_back(GetObjectIdentifier(endpoint)); xAdd.emplace_back(GetObjectIdentifier(endpoint));
} }
m_Rcon->FireAndForgetQuery(std::move(xAdd)); m_Rcon->FireAndForgetQuery(std::move(xAdd), Prio::History);
} }
Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable)

View File

@ -15,6 +15,8 @@ using namespace icinga;
#define MAX_EVENTS_DEFAULT 5000 #define MAX_EVENTS_DEFAULT 5000
using Prio = RedisConnection::QueryPriority;
REGISTER_TYPE(IcingaDB); REGISTER_TYPE(IcingaDB);
IcingaDB::IcingaDB() IcingaDB::IcingaDB()
@ -120,7 +122,7 @@ void IcingaDB::PublishStats()
status->Set("config_dump_in_progress", m_ConfigDumpInProgress); status->Set("config_dump_in_progress", m_ConfigDumpInProgress);
String jsonStats = JsonEncode(status); String jsonStats = JsonEncode(status);
m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, true); m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, Prio::Heartbeat);
} }
void IcingaDB::HandleEvents() void IcingaDB::HandleEvents()

View File

@ -45,7 +45,7 @@ private:
/* config & status dump */ /* config & status dump */
void UpdateAllConfigObjects(); void UpdateAllConfigObjects();
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize); std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys); void DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority);
std::vector<String> GetTypeObjectKeys(const String& type); std::vector<String> GetTypeObjectKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate); std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);

View File

@ -68,7 +68,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
} }
} }
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool highPrio) void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{ {
{ {
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
@ -77,13 +77,13 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool high
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQuery)::element_type>(std::move(query))); auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQuery)::element_type>(std::move(query)));
asio::post(m_Strand, [this, item, highPrio]() { asio::post(m_Strand, [this, item, priority]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{item, nullptr, nullptr, nullptr}); m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
m_QueuedWrites.Set(); m_QueuedWrites.Set();
}); });
} }
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, bool highPrio) void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{ {
for (auto& query : queries) { for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
@ -92,13 +92,13 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, boo
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQueries)::element_type>(std::move(queries))); auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQueries)::element_type>(std::move(queries)));
asio::post(m_Strand, [this, item, highPrio]() { asio::post(m_Strand, [this, item, priority]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, item, nullptr, nullptr}); m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
m_QueuedWrites.Set(); m_QueuedWrites.Set();
}); });
} }
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, bool highPrio) RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{ {
{ {
Log msg (LogNotice, "IcingaDB", "Executing query:"); Log msg (LogNotice, "IcingaDB", "Executing query:");
@ -109,8 +109,8 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
auto future (promise.get_future()); auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultOfQuery)::element_type>(std::move(query), std::move(promise))); auto item (std::make_shared<decltype(WriteQueueItem().GetResultOfQuery)::element_type>(std::move(query), std::move(promise)));
asio::post(m_Strand, [this, item, highPrio]() { asio::post(m_Strand, [this, item, priority]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, item, nullptr}); m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
m_QueuedWrites.Set(); m_QueuedWrites.Set();
}); });
@ -119,7 +119,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
return future.get(); return future.get();
} }
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, bool highPrio) RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{ {
for (auto& query : queries) { for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Executing query:"); Log msg (LogNotice, "IcingaDB", "Executing query:");
@ -130,8 +130,8 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
auto future (promise.get_future()); auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultsOfQueries)::element_type>(std::move(queries), std::move(promise))); auto item (std::make_shared<decltype(WriteQueueItem().GetResultsOfQueries)::element_type>(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item, highPrio]() { asio::post(m_Strand, [this, item, priority]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, nullptr, item}); m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
m_QueuedWrites.Set(); m_QueuedWrites.Set();
}); });
@ -267,22 +267,18 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
for (;;) { for (;;) {
m_QueuedWrites.Wait(yc); m_QueuedWrites.Wait(yc);
for (;;) { WriteFirstOfHighestPrio:
if (m_Queues.HighPrioWrites.empty()) { for (auto& queue : m_Queues.Writes) {
if (m_Queues.Writes.empty()) { if (queue.second.empty()) {
break; continue;
} else {
auto next (std::move(m_Queues.Writes.front()));
m_Queues.Writes.pop();
WriteItem(yc, std::move(next));
}
} else {
auto next (std::move(m_Queues.HighPrioWrites.front()));
m_Queues.HighPrioWrites.pop();
WriteItem(yc, std::move(next));
} }
auto next (std::move(queue.second.front()));
queue.second.pop();
WriteItem(yc, std::move(next));
goto WriteFirstOfHighestPrio;
} }
m_QueuedWrites.Clear(); m_QueuedWrites.Clear();

View File

@ -27,6 +27,7 @@
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <future> #include <future>
#include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <stdexcept> #include <stdexcept>
@ -50,6 +51,14 @@ namespace icinga
typedef Value Reply; typedef Value Reply;
typedef std::vector<Reply> Replies; typedef std::vector<Reply> Replies;
enum class QueryPriority : unsigned char
{
Heartbeat,
Config,
State,
History
};
RedisConnection(const String& host, const int port, const String& path, RedisConnection(const String& host, const int port, const String& path,
const String& password = "", const int db = 0); const String& password = "", const int db = 0);
@ -57,11 +66,11 @@ namespace icinga
bool IsConnected(); bool IsConnected();
void FireAndForgetQuery(Query query, bool highPrio = false); void FireAndForgetQuery(Query query, QueryPriority priority);
void FireAndForgetQueries(Queries queries, bool highPrio = false); void FireAndForgetQueries(Queries queries, QueryPriority priority);
Reply GetResultOfQuery(Query query, bool highPrio = false); Reply GetResultOfQuery(Query query, QueryPriority priority);
Replies GetResultsOfQueries(Queries queries, bool highPrio = false); Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
private: private:
enum class ResponseAction : unsigned char enum class ResponseAction : unsigned char
@ -128,7 +137,7 @@ namespace icinga
Atomic<bool> m_Connecting, m_Connected, m_Started; Atomic<bool> m_Connecting, m_Connected, m_Started;
struct { struct {
std::queue<WriteQueueItem> Writes, HighPrioWrites; std::map<QueryPriority, std::queue<WriteQueueItem>> Writes;
std::queue<std::promise<Reply>> ReplyPromises; std::queue<std::promise<Reply>> ReplyPromises;
std::queue<std::promise<Replies>> RepliesPromises; std::queue<std::promise<Replies>> RepliesPromises;
std::queue<FutureResponseAction> FutureResponseActions; std::queue<FutureResponseAction> FutureResponseActions;