RedisWriter: populate icinga:history:stream:*:notification

This commit is contained in:
Alexander A. Klimov 2019-10-02 12:01:51 +02:00 committed by Michael Friedrich
parent 33bdc8b5c3
commit d0165773d2
2 changed files with 74 additions and 0 deletions

View File

@ -46,6 +46,7 @@
#include "base/utility.hpp"
#include <iterator>
#include <map>
#include <memory>
#include <set>
#include <utility>
@ -92,6 +93,14 @@ void RedisWriter::ConfigStaticInitialize()
Downtime::OnDowntimeTriggered.connect(&RedisWriter::DowntimeChangedHandler);
/* fixed/flexible downtime end */
Downtime::OnDowntimeRemoved.connect(&RedisWriter::DowntimeChangedHandler);
Checkable::OnNotificationSentToAllUsers.connect([](
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const String& text,
const MessageOrigin::Ptr&
) {
RedisWriter::NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
});
}
void RedisWriter::UpdateAllConfigObjects()
@ -1142,6 +1151,41 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object)
m_Rcon->FireAndForgetQuery(std::move(streamadd));
}
void RedisWriter::SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
)
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
auto service (dynamic_pointer_cast<Service>(checkable));
auto output (cr->GetOutput());
auto pos (output.Find("\n"));
String shortOutput, longOutput;
if (pos == String::NPos) {
shortOutput = std::move(output);
} else {
shortOutput = output.SubStr(0, pos);
longOutput = output.SubStr(pos + 1u);
}
m_Rcon->FireAndForgetQuery({
"XADD", service ? "icinga:history:stream:service:notification" : "icinga:history:stream:host:notification", "*",
"id", Utility::NewUniqueID(),
"environment_id", SHA1(GetEnvironment()),
service ? "service_id" : "host_id", GetObjectIdentifier(checkable),
"notification_id", GetObjectIdentifier(notification),
"type", Convert::ToString(type),
"send_time", Convert::ToString(Utility::GetTime()),
"state", Convert::ToString(cr->GetState()),
"output", Utility::ValidateUTF8(std::move(shortOutput)),
"long_output", Utility::ValidateUTF8(std::move(longOutput)),
"users_notified", Convert::ToString(users)
});
}
Dictionary::Ptr RedisWriter::SerializeState(const Checkable::Ptr& checkable)
{
Dictionary::Ptr attrs = new Dictionary();
@ -1313,3 +1357,22 @@ void RedisWriter::DowntimeChangedHandler(const Downtime::Ptr& downtime)
{
StateChangeHandler(downtime->GetCheckable());
}
void RedisWriter::NotificationSentToAllUsersHandler(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
)
{
auto rws (ConfigType::GetObjectsByType<RedisWriter>());
if (!rws.empty()) {
auto usersAmount (users.size());
auto authorAndText (std::make_shared<std::pair<String, String>>(author, text));
for (auto& rw : rws) {
rw->m_WorkQueue.Enqueue([rw, notification, checkable, usersAmount, type, cr, authorAndText]() {
rw->SendSentNotification(notification, checkable, usersAmount, type, cr, authorAndText->first, authorAndText->second);
});
}
}
}

View File

@ -81,6 +81,12 @@ private:
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object);
void SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
);
std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable);
@ -110,6 +116,11 @@ private:
static void VersionChangedHandler(const ConfigObject::Ptr& object);
static void DowntimeChangedHandler(const Downtime::Ptr& downtime);
static void NotificationSentToAllUsersHandler(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
);
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);