Icinga DB: Sync state using runtime updates

This commit is contained in:
Noah Hilverling 2021-04-22 09:15:55 +02:00
parent ab04a4ee98
commit 4005d81a43
4 changed files with 58 additions and 24 deletions

View File

@ -218,9 +218,11 @@ void IcingaDB::UpdateAllConfigObjects()
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
std::map<String, std::vector<String>> hMSets; std::map<String, std::vector<String>> hMSets;
std::vector<String> states = {"HMSET", m_PrefixStateObject + lcType}; // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded)
std::vector<Dictionary::Ptr> runtimeUpdates; std::vector<String> states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
std::vector<std::vector<String> > transaction = {{"MULTI"}}; // Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded)
std::vector<String> statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
std::vector<std::vector<String> > transaction = {{"MULTI"}};
std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"};
auto skimObjects ([&]() { auto skimObjects ([&]() {
@ -243,12 +245,19 @@ void IcingaDB::UpdateAllConfigObjects()
if (lcType != GetLowerCaseTypeNameDB(object)) if (lcType != GetLowerCaseTypeNameDB(object))
continue; continue;
std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false); CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false);
// Write out inital state for checkables // Write out inital state for checkables
if (dumpState) { if (dumpState) {
states.emplace_back(GetObjectIdentifier(object)); String objectKey = GetObjectIdentifier(object);
states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast<Checkable>(object)))); Dictionary::Ptr state = SerializeState(dynamic_pointer_cast<Checkable>(object));
states.emplace_back(objectKey);
states.emplace_back(JsonEncode(state));
statesChksms.emplace_back(objectKey);
statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}})));
} }
bulkCounter++; bulkCounter++;
@ -264,7 +273,9 @@ void IcingaDB::UpdateAllConfigObjects()
if (states.size() > 2) { if (states.size() > 2) {
transaction.emplace_back(std::move(states)); transaction.emplace_back(std::move(states));
states = {"HMSET", m_PrefixStateObject + lcType}; transaction.emplace_back(std::move(statesChksms));
states = {"HMSET", m_PrefixConfigObject + lcType + ":state"};
statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"};
} }
hMSets = decltype(hMSets)(); hMSets = decltype(hMSets)();
@ -524,7 +535,7 @@ std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
if (type == "host" || type == "service" || type == "user") { if (type == "host" || type == "service" || type == "user") {
keys.emplace_back(m_PrefixConfigObject + type + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + type + ":groupmember");
keys.emplace_back(m_PrefixStateObject + type); keys.emplace_back(m_PrefixConfigObject + type + ":state");
} else if (type == "timeperiod") { } else if (type == "timeperiod") {
keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); keys.emplace_back(m_PrefixConfigObject + type + ":override:include");
keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude");
@ -556,7 +567,7 @@ std::vector<String> IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type)
if (type == Host::TypeInstance || type == Service::TypeInstance) { if (type == Host::TypeInstance || type == Service::TypeInstance) {
keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember");
keys.emplace_back(m_PrefixStateObject + lcType); keys.emplace_back(m_PrefixConfigObject + lcType + ":state");
} else if (type == User::TypeInstance) { } else if (type == User::TypeInstance) {
keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember"); keys.emplace_back(m_PrefixConfigObject + lcType + ":groupmember");
} else if (type == TimePeriod::TypeInstance) { } else if (type == TimePeriod::TypeInstance) {
@ -1049,9 +1060,14 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable)
if (!m_Rcon || !m_Rcon->IsConnected()) if (!m_Rcon || !m_Rcon->IsConnected())
return; return;
String objectType = GetLowerCaseTypeNameDB(checkable);
String objectKey = GetObjectIdentifier(checkable);
Dictionary::Ptr stateAttrs = SerializeState(checkable); Dictionary::Ptr stateAttrs = SerializeState(checkable);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}, Prio::State); m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + objectType + ":state", objectKey, JsonEncode(stateAttrs)}, Prio::State);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + objectType + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", HashValue(stateAttrs)}}))}, Prio::State);
} }
// Used to update a single object, used for runtime updates // Used to update a single object, used for runtime updates
@ -1063,15 +1079,22 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
String typeName = GetLowerCaseTypeNameDB(object); String typeName = GetLowerCaseTypeNameDB(object);
std::map<String, std::vector<String>> hMSets; std::map<String, std::vector<String>> hMSets;
std::vector<String> states = {"HMSET", m_PrefixStateObject + typeName};
std::vector<Dictionary::Ptr> runtimeUpdates; std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) { if (checkable) {
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}, Prio::State); Dictionary::Ptr state = SerializeState(checkable);
publishes["icinga:config:update:state:" + typeName].emplace_back(objectKey); String checksum = HashValue(state);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigObject + typeName + ":state", objectKey, JsonEncode(state)}, Prio::State);
m_Rcon->FireAndForgetQuery({"HSET", m_PrefixConfigCheckSum + typeName + ":state", objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, Prio::State);
if (runtimeUpdate) {
state->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName + ":state", state);
}
} }
std::vector<std::vector<String> > transaction = {{"MULTI"}}; std::vector<std::vector<String> > transaction = {{"MULTI"}};
@ -1443,14 +1466,14 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
auto checkable (dynamic_pointer_cast<Checkable>(object)); auto checkable (dynamic_pointer_cast<Checkable>(object));
if (checkable) { if (checkable) {
m_Rcon->FireAndForgetQuery( m_Rcon->FireAndForgetQueries({
{ {
"ZREM", "ZREM",
dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host",
GetObjectIdentifier(checkable) GetObjectIdentifier(checkable)
}, },
Prio::CheckResult {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey},
); }, Prio::CheckResult);
} }
} }
@ -1480,19 +1503,22 @@ void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResu
tie(host, service) = GetHostService(checkable); tie(host, service) = GetHostService(checkable);
String streamname; String redisKey;
if (service) if (service)
streamname = "icinga:state:stream:service"; redisKey = "icinga:service:state";
else else
streamname = "icinga:state:stream:host"; redisKey = "icinga:host:state";
Dictionary::Ptr objectAttrs = SerializeState(checkable); Dictionary::Ptr objectAttrs = SerializeState(checkable);
objectAttrs->Set("redis_key", redisKey);
objectAttrs->Set("runtime_type", "upsert");
objectAttrs->Set("checksum", HashValue(objectAttrs));
std::vector<String> streamadd({"XADD", streamname, "*"}); std::vector<String> streamadd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
ObjectLock olock(objectAttrs); ObjectLock olock(objectAttrs);
for (const Dictionary::Pair& kv : objectAttrs) { for (const Dictionary::Pair& kv : objectAttrs) {
streamadd.emplace_back(kv.first); streamadd.emplace_back(kv.first);
streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); streamadd.emplace_back(IcingaToStreamValue(kv.second));
} }
m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State); m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::State);
@ -2084,7 +2110,17 @@ Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable)
tie(host, service) = GetHostService(checkable); tie(host, service) = GetHostService(checkable);
attrs->Set("id", GetObjectIdentifier(checkable));; String id = GetObjectIdentifier(checkable);
/*
* As there is a 1:1 relationship between host and host state, the host ID ('object_id')
* is also used as the host state ID ('id'). These are duplicated to 1) avoid having
* special handling for this in Icinga DB and 2) to have both a primary key and a foreign key
* in the SQL database in the end. In the database 'object_id' ends up as foreign key 'host_state.host_id'
* referring to 'host.id' while 'id' ends up as the primary key 'host_state.id'. This also applies for service.
*/
attrs->Set("id", id);
attrs->Set("object_id", id);
attrs->Set("environment_id", m_EnvironmentId); attrs->Set("environment_id", m_EnvironmentId);
attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard); attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard);

View File

@ -217,7 +217,7 @@ String IcingaDB::IcingaToStreamValue(const Value& value)
{ {
switch (value.GetType()) { switch (value.GetType()) {
case ValueBoolean: case ValueBoolean:
return Convert::ToString((unsigned short)value); return Convert::ToString(int(value));
case ValueString: case ValueString:
return Utility::ValidateUTF8(value); return Utility::ValidateUTF8(value);
case ValueNumber: case ValueNumber:

View File

@ -50,7 +50,6 @@ IcingaDB::IcingaDB()
m_PrefixConfigObject = "icinga:"; m_PrefixConfigObject = "icinga:";
m_PrefixConfigCheckSum = "icinga:checksum:"; m_PrefixConfigCheckSum = "icinga:checksum:";
m_PrefixStateObject = "icinga:config:state:";
} }
/** /**

View File

@ -155,7 +155,6 @@ private:
String m_PrefixConfigObject; String m_PrefixConfigObject;
String m_PrefixConfigCheckSum; String m_PrefixConfigCheckSum;
String m_PrefixStateObject;
bool m_ConfigDumpInProgress; bool m_ConfigDumpInProgress;
bool m_ConfigDumpDone; bool m_ConfigDumpDone;