RedisWriter: populate icinga:history:stream:*:state

This commit is contained in:
Alexander A. Klimov 2019-10-02 13:14:31 +02:00 committed by Michael Friedrich
parent d0165773d2
commit feeae9d518
2 changed files with 51 additions and 19 deletions

View File

@ -70,8 +70,8 @@ INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize);
void RedisWriter::ConfigStaticInitialize() void RedisWriter::ConfigStaticInitialize()
{ {
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr&, StateType, const MessageOrigin::Ptr&) { Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
RedisWriter::StateChangeHandler(checkable); RedisWriter::StateChangeHandler(checkable, cr, type);
}); });
/* triggered when acknowledged host/service goes back to ok and when the acknowledgement gets deleted */ /* triggered when acknowledged host/service goes back to ok and when the acknowledgement gets deleted */
@ -103,6 +103,19 @@ void RedisWriter::ConfigStaticInitialize()
}); });
} }
static std::pair<String, String> SplitOutput(String output)
{
String longOutput;
auto pos (output.Find("\n"));
if (pos != String::NPos) {
longOutput = output.SubStr(pos + 1u);
output.erase(output.Begin() + pos, output.End());
}
return {std::move(output), std::move(longOutput)};
}
void RedisWriter::UpdateAllConfigObjects() void RedisWriter::UpdateAllConfigObjects()
{ {
double startTime = Utility::GetTime(); double startTime = Utility::GetTime();
@ -1119,7 +1132,7 @@ void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object)
}); });
} }
void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object) void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type)
{ {
if (!m_Rcon || !m_Rcon->IsConnected()) if (!m_Rcon || !m_Rcon->IsConnected())
return; return;
@ -1149,6 +1162,26 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object)
} }
m_Rcon->FireAndForgetQuery(std::move(streamadd)); m_Rcon->FireAndForgetQuery(std::move(streamadd));
auto output (SplitOutput(cr->GetOutput()));
m_Rcon->FireAndForgetQuery({
"XADD", service ? "icinga:history:stream:service:state" : "icinga:history:stream:host:state", "*",
"id", Utility::NewUniqueID(),
"environment_id", SHA1(GetEnvironment()),
service ? "service_id" : "host_id", GetObjectIdentifier(checkable),
"change_time", Convert::ToString(cr->GetExecutionEnd()),
"state_type", Convert::ToString(type),
"soft_state", Convert::ToString(cr->GetState()),
"hard_state", Convert::ToString(service ? service->GetLastHardState() : host->GetLastHardState()),
"attempt", Convert::ToString(checkable->GetCheckAttempt()),
// TODO: last_hard/soft_state should be "previous".
"last_soft_state", Convert::ToString(cr->GetState()),
"last_hard_state", Convert::ToString(service ? service->GetLastHardState() : host->GetLastHardState()),
"output", Utility::ValidateUTF8(std::move(output.first)),
"long_output", Utility::ValidateUTF8(std::move(output.second)),
"max_check_attempts", Convert::ToString(checkable->GetMaxCheckAttempts())
});
} }
void RedisWriter::SendSentNotification( void RedisWriter::SendSentNotification(
@ -1160,16 +1193,7 @@ void RedisWriter::SendSentNotification(
return; return;
auto service (dynamic_pointer_cast<Service>(checkable)); auto service (dynamic_pointer_cast<Service>(checkable));
auto output (cr->GetOutput()); auto output (SplitOutput(cr->GetOutput()));
auto pos (output.Find("\n"));
String shortOutput, longOutput;
if (pos == String::NPos) {
shortOutput = std::move(output);
} else {
shortOutput = output.SubStr(0, pos);
longOutput = output.SubStr(pos + 1u);
}
m_Rcon->FireAndForgetQuery({ m_Rcon->FireAndForgetQuery({
"XADD", service ? "icinga:history:stream:service:notification" : "icinga:history:stream:host:notification", "*", "XADD", service ? "icinga:history:stream:service:notification" : "icinga:history:stream:host:notification", "*",
@ -1180,8 +1204,8 @@ void RedisWriter::SendSentNotification(
"type", Convert::ToString(type), "type", Convert::ToString(type),
"send_time", Convert::ToString(Utility::GetTime()), "send_time", Convert::ToString(Utility::GetTime()),
"state", Convert::ToString(cr->GetState()), "state", Convert::ToString(cr->GetState()),
"output", Utility::ValidateUTF8(std::move(shortOutput)), "output", Utility::ValidateUTF8(std::move(output.first)),
"long_output", Utility::ValidateUTF8(std::move(longOutput)), "long_output", Utility::ValidateUTF8(std::move(output.second)),
"users_notified", Convert::ToString(users) "users_notified", Convert::ToString(users)
}); });
} }
@ -1326,10 +1350,17 @@ RedisWriter::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType,
void RedisWriter::StateChangeHandler(const ConfigObject::Ptr& object) void RedisWriter::StateChangeHandler(const ConfigObject::Ptr& object)
{ {
Type::Ptr type = object->GetReflectionType(); auto checkable (dynamic_pointer_cast<Checkable>(object));
if (checkable) {
RedisWriter::StateChangeHandler(object, checkable->GetLastCheckResult(), checkable->GetStateType());
}
}
void RedisWriter::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type)
{
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) { for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendStatusUpdate(object); }); rw->m_WorkQueue.Enqueue([rw, object, cr, type]() { rw->SendStatusUpdate(object, cr, type); });
} }
} }

View File

@ -80,7 +80,7 @@ private:
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets, void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate); std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object); void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object); void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
void SendSentNotification( void SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users, const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users,
@ -113,6 +113,7 @@ private:
static bool PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checkSums); static bool PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checkSums);
static void StateChangeHandler(const ConfigObject::Ptr& object); static void StateChangeHandler(const ConfigObject::Ptr& object);
static void StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
static void VersionChangedHandler(const ConfigObject::Ptr& object); static void VersionChangedHandler(const ConfigObject::Ptr& object);
static void DowntimeChangedHandler(const Downtime::Ptr& downtime); static void DowntimeChangedHandler(const Downtime::Ptr& downtime);