mirror of https://github.com/Icinga/icinga2.git
Icinga DB: use one Redis connection per config object type
This commit is contained in:
parent
8b516f0c08
commit
d1e15a220c
|
@ -55,6 +55,27 @@ return id
|
|||
|
||||
INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize);
|
||||
|
||||
std::vector<Type::Ptr> IcingaDB::GetTypes()
|
||||
{
|
||||
return {
|
||||
CheckCommand::TypeInstance,
|
||||
Comment::TypeInstance,
|
||||
Downtime::TypeInstance,
|
||||
Endpoint::TypeInstance,
|
||||
EventCommand::TypeInstance,
|
||||
Host::TypeInstance,
|
||||
HostGroup::TypeInstance,
|
||||
Notification::TypeInstance,
|
||||
NotificationCommand::TypeInstance,
|
||||
Service::TypeInstance,
|
||||
ServiceGroup::TypeInstance,
|
||||
TimePeriod::TypeInstance,
|
||||
User::TypeInstance,
|
||||
UserGroup::TypeInstance,
|
||||
Zone::TypeInstance
|
||||
};
|
||||
}
|
||||
|
||||
void IcingaDB::ConfigStaticInitialize()
|
||||
{
|
||||
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
|
||||
|
@ -125,23 +146,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
WorkQueue upq(25000, Configuration::Concurrency);
|
||||
upq.SetName("IcingaDB:ConfigDump");
|
||||
|
||||
std::vector<Type::Ptr> types = {
|
||||
CheckCommand::TypeInstance,
|
||||
Comment::TypeInstance,
|
||||
Downtime::TypeInstance,
|
||||
Endpoint::TypeInstance,
|
||||
EventCommand::TypeInstance,
|
||||
Host::TypeInstance,
|
||||
HostGroup::TypeInstance,
|
||||
Notification::TypeInstance,
|
||||
NotificationCommand::TypeInstance,
|
||||
Service::TypeInstance,
|
||||
ServiceGroup::TypeInstance,
|
||||
TimePeriod::TypeInstance,
|
||||
User::TypeInstance,
|
||||
UserGroup::TypeInstance,
|
||||
Zone::TypeInstance
|
||||
};
|
||||
std::vector<Type::Ptr> types = GetTypes();
|
||||
|
||||
m_Rcon->SuppressQueryKind(Prio::CheckResult);
|
||||
m_Rcon->SuppressQueryKind(Prio::State);
|
||||
|
@ -159,8 +164,9 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
m_PrefixConfigObject + "notes:url",
|
||||
m_PrefixConfigObject + "icon:image",
|
||||
};
|
||||
DeleteKeys(globalKeys, Prio::Config);
|
||||
DeleteKeys({"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::CheckResult);
|
||||
DeleteKeys(m_Rcon, globalKeys, Prio::Config);
|
||||
DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::CheckResult);
|
||||
m_Rcon->Sync();
|
||||
|
||||
Defer resetDumpedGlobals ([this]() {
|
||||
m_DumpedGlobals.CustomVar.Reset();
|
||||
|
@ -175,8 +181,10 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
if (!ctype)
|
||||
return;
|
||||
|
||||
auto& rcon (m_Rcons.at(ctype));
|
||||
|
||||
std::vector<String> keys = GetTypeOverwriteKeys(lcType);
|
||||
DeleteKeys(keys, Prio::Config);
|
||||
DeleteKeys(rcon, keys, Prio::Config);
|
||||
|
||||
WorkQueue upqObjectType(25000, Configuration::Concurrency);
|
||||
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
|
||||
|
@ -184,11 +192,11 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
std::map<String, String> redisCheckSums;
|
||||
String configCheckSum = m_PrefixConfigCheckSum + lcType;
|
||||
|
||||
upqObjectType.Enqueue([this, &configCheckSum, &redisCheckSums]() {
|
||||
upqObjectType.Enqueue([&]() {
|
||||
String cursor = "0";
|
||||
|
||||
do {
|
||||
Array::Ptr res = m_Rcon->GetResultOfQuery({
|
||||
Array::Ptr res = rcon->GetResultOfQuery({
|
||||
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
|
||||
}, Prio::Config);
|
||||
|
||||
|
@ -282,7 +290,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
|
||||
if (transaction.size() > 1) {
|
||||
transaction.push_back({"EXEC"});
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
transaction = {{"MULTI"}};
|
||||
}
|
||||
}
|
||||
|
@ -298,7 +306,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
if (zAdds->size() >= 102u) {
|
||||
std::vector<String> header (zAdds->begin(), zAdds->begin() + 2u);
|
||||
|
||||
m_Rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult);
|
||||
rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult);
|
||||
|
||||
*zAdds = std::move(header);
|
||||
}
|
||||
|
@ -319,12 +327,12 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
|
||||
if (transaction.size() > 1) {
|
||||
transaction.push_back({"EXEC"});
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
}
|
||||
|
||||
for (auto zAdds : {&hostZAdds, &serviceZAdds}) {
|
||||
if (zAdds->size() > 2u) {
|
||||
m_Rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult);
|
||||
rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -386,7 +394,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
setChecksum.clear();
|
||||
setObject.clear();
|
||||
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
});
|
||||
|
||||
auto flushDels ([&]() {
|
||||
|
@ -403,7 +411,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
delChecksum.clear();
|
||||
delObject.clear();
|
||||
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
});
|
||||
|
||||
auto setOne ([&]() {
|
||||
|
@ -464,8 +472,9 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
}
|
||||
|
||||
for (auto& key : GetTypeDumpSignalKeys(type)) {
|
||||
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config);
|
||||
rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config);
|
||||
}
|
||||
rcon->Sync();
|
||||
});
|
||||
|
||||
upq.Join();
|
||||
|
@ -518,13 +527,13 @@ std::vector<std::vector<intrusive_ptr<ConfigObject>>> IcingaDB::ChunkObjects(std
|
|||
return std::move(chunks);
|
||||
}
|
||||
|
||||
void IcingaDB::DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority) {
|
||||
void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority) {
|
||||
std::vector<String> query = {"DEL"};
|
||||
for (auto& key : keys) {
|
||||
query.emplace_back(key);
|
||||
}
|
||||
|
||||
m_Rcon->FireAndForgetQuery(std::move(query), priority);
|
||||
conn->FireAndForgetQuery(std::move(query), priority);
|
||||
}
|
||||
|
||||
std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
|
||||
|
|
|
@ -77,6 +77,16 @@ void IcingaDB::Start(bool runtimeCreated)
|
|||
});
|
||||
m_Rcon->Start();
|
||||
|
||||
for (const Type::Ptr& type : GetTypes()) {
|
||||
auto ctype (dynamic_cast<ConfigType*>(type.get()));
|
||||
if (!ctype)
|
||||
continue;
|
||||
|
||||
RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex()));
|
||||
rCon->Start();
|
||||
m_Rcons[ctype] = std::move(rCon);
|
||||
}
|
||||
|
||||
m_StatsTimer = new Timer();
|
||||
m_StatsTimer->SetInterval(1);
|
||||
m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); });
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
namespace icinga
|
||||
|
@ -57,7 +58,7 @@ private:
|
|||
/* config & status dump */
|
||||
void UpdateAllConfigObjects();
|
||||
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
|
||||
void DeleteKeys(const std::vector<String>& keys, RedisConnection::QueryPriority priority);
|
||||
void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority);
|
||||
std::vector<String> GetTypeOverwriteKeys(const String& type);
|
||||
std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type);
|
||||
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
|
||||
|
@ -150,6 +151,8 @@ private:
|
|||
return std::move(haystack);
|
||||
}
|
||||
|
||||
static std::vector<Type::Ptr> GetTypes();
|
||||
|
||||
Timer::Ptr m_StatsTimer;
|
||||
WorkQueue m_WorkQueue;
|
||||
|
||||
|
@ -160,6 +163,7 @@ private:
|
|||
bool m_ConfigDumpDone;
|
||||
|
||||
RedisConnection::Ptr m_Rcon;
|
||||
std::unordered_map<ConfigType*, RedisConnection::Ptr> m_Rcons;
|
||||
|
||||
struct {
|
||||
DumpedGlobals CustomVar, ActionUrl, NotesUrl, IconImage;
|
||||
|
|
|
@ -187,6 +187,17 @@ void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yiel
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts a no-op command with a result at the end of the queue and wait for the result,
|
||||
* i.e. for everything enqueued to be processed by the server.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
void RedisConnection::Sync()
|
||||
{
|
||||
GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark kind as kind of queries not to actually send yet
|
||||
*
|
||||
|
|
|
@ -63,7 +63,8 @@ namespace icinga
|
|||
Config,
|
||||
State,
|
||||
History,
|
||||
CheckResult
|
||||
CheckResult,
|
||||
SyncConnection = 255
|
||||
};
|
||||
|
||||
RedisConnection(const String& host, const int port, const String& path,
|
||||
|
@ -80,6 +81,7 @@ namespace icinga
|
|||
Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
|
||||
|
||||
void EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, QueryPriority priority);
|
||||
void Sync();
|
||||
|
||||
void SuppressQueryKind(QueryPriority kind);
|
||||
void UnsuppressQueryKind(QueryPriority kind);
|
||||
|
|
Loading…
Reference in New Issue