diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 0ceaa4430..ac5669595 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -55,6 +55,27 @@ return id INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize); +std::vector IcingaDB::GetTypes() +{ + return { + CheckCommand::TypeInstance, + Comment::TypeInstance, + Downtime::TypeInstance, + Endpoint::TypeInstance, + EventCommand::TypeInstance, + Host::TypeInstance, + HostGroup::TypeInstance, + Notification::TypeInstance, + NotificationCommand::TypeInstance, + Service::TypeInstance, + ServiceGroup::TypeInstance, + TimePeriod::TypeInstance, + User::TypeInstance, + UserGroup::TypeInstance, + Zone::TypeInstance + }; +} + void IcingaDB::ConfigStaticInitialize() { /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ @@ -125,23 +146,7 @@ void IcingaDB::UpdateAllConfigObjects() WorkQueue upq(25000, Configuration::Concurrency); upq.SetName("IcingaDB:ConfigDump"); - std::vector types = { - CheckCommand::TypeInstance, - Comment::TypeInstance, - Downtime::TypeInstance, - Endpoint::TypeInstance, - EventCommand::TypeInstance, - Host::TypeInstance, - HostGroup::TypeInstance, - Notification::TypeInstance, - NotificationCommand::TypeInstance, - Service::TypeInstance, - ServiceGroup::TypeInstance, - TimePeriod::TypeInstance, - User::TypeInstance, - UserGroup::TypeInstance, - Zone::TypeInstance - }; + std::vector types = GetTypes(); m_Rcon->SuppressQueryKind(Prio::CheckResult); m_Rcon->SuppressQueryKind(Prio::State); @@ -159,8 +164,9 @@ void IcingaDB::UpdateAllConfigObjects() m_PrefixConfigObject + "notes:url", m_PrefixConfigObject + "icon:image", }; - DeleteKeys(globalKeys, Prio::Config); - DeleteKeys({"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::CheckResult); + DeleteKeys(m_Rcon, globalKeys, Prio::Config); + DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::CheckResult); + m_Rcon->Sync(); Defer resetDumpedGlobals ([this]() { m_DumpedGlobals.CustomVar.Reset(); @@ -175,8 +181,10 @@ void IcingaDB::UpdateAllConfigObjects() if (!ctype) return; + auto& rcon (m_Rcons.at(ctype)); + std::vector keys = GetTypeOverwriteKeys(lcType); - DeleteKeys(keys, Prio::Config); + DeleteKeys(rcon, keys, Prio::Config); WorkQueue upqObjectType(25000, Configuration::Concurrency); upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); @@ -184,11 +192,11 @@ void IcingaDB::UpdateAllConfigObjects() std::map redisCheckSums; String configCheckSum = m_PrefixConfigCheckSum + lcType; - upqObjectType.Enqueue([this, &configCheckSum, &redisCheckSums]() { + upqObjectType.Enqueue([&]() { String cursor = "0"; do { - Array::Ptr res = m_Rcon->GetResultOfQuery({ + Array::Ptr res = rcon->GetResultOfQuery({ "HSCAN", configCheckSum, cursor, "COUNT", "1000" }, Prio::Config); @@ -282,7 +290,7 @@ void IcingaDB::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); transaction = {{"MULTI"}}; } } @@ -298,7 +306,7 @@ void IcingaDB::UpdateAllConfigObjects() if (zAdds->size() >= 102u) { std::vector header (zAdds->begin(), zAdds->begin() + 2u); - m_Rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); + rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); *zAdds = std::move(header); } @@ -319,12 +327,12 @@ void IcingaDB::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); } for (auto zAdds : {&hostZAdds, &serviceZAdds}) { if (zAdds->size() > 2u) { - m_Rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); + rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); } } @@ -386,7 +394,7 @@ void IcingaDB::UpdateAllConfigObjects() setChecksum.clear(); setObject.clear(); - m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); }); auto flushDels ([&]() { @@ -403,7 +411,7 @@ void IcingaDB::UpdateAllConfigObjects() delChecksum.clear(); delObject.clear(); - m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); }); auto setOne ([&]() { @@ -464,8 +472,9 @@ void IcingaDB::UpdateAllConfigObjects() } for (auto& key : GetTypeDumpSignalKeys(type)) { - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); + rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); } + rcon->Sync(); }); upq.Join(); @@ -518,13 +527,13 @@ std::vector>> IcingaDB::ChunkObjects(std return std::move(chunks); } -void IcingaDB::DeleteKeys(const std::vector& keys, RedisConnection::QueryPriority priority) { +void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys, RedisConnection::QueryPriority priority) { std::vector query = {"DEL"}; for (auto& key : keys) { query.emplace_back(key); } - m_Rcon->FireAndForgetQuery(std::move(query), priority); + conn->FireAndForgetQuery(std::move(query), priority); } std::vector IcingaDB::GetTypeOverwriteKeys(const String& type) diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index ad97ad153..6d6b501fa 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -77,6 +77,16 @@ void IcingaDB::Start(bool runtimeCreated) }); m_Rcon->Start(); + for (const Type::Ptr& type : GetTypes()) { + auto ctype (dynamic_cast(type.get())); + if (!ctype) + continue; + + RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex())); + rCon->Start(); + m_Rcons[ctype] = std::move(rCon); + } + m_StatsTimer = new Timer(); m_StatsTimer->SetInterval(1); m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); }); diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 2cdc30a16..0ccbdd5b1 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace icinga @@ -57,7 +58,7 @@ private: /* config & status dump */ void UpdateAllConfigObjects(); std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); - void DeleteKeys(const std::vector& keys, RedisConnection::QueryPriority priority); + void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys, RedisConnection::QueryPriority priority); std::vector GetTypeOverwriteKeys(const String& type); std::vector GetTypeDumpSignalKeys(const Type::Ptr& type); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, @@ -150,6 +151,8 @@ private: return std::move(haystack); } + static std::vector GetTypes(); + Timer::Ptr m_StatsTimer; WorkQueue m_WorkQueue; @@ -160,6 +163,7 @@ private: bool m_ConfigDumpDone; RedisConnection::Ptr m_Rcon; + std::unordered_map m_Rcons; struct { DumpedGlobals CustomVar, ActionUrl, NotesUrl, IconImage; diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index 7380ddd60..a605e7967 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -187,6 +187,17 @@ void RedisConnection::EnqueueCallback(const std::function& callback, QueryPriority priority); + void Sync(); void SuppressQueryKind(QueryPriority kind); void UnsuppressQueryKind(QueryPriority kind);