Merge pull request #10420 from Icinga/bundled-perfdata-writers-fix

Serialize fields before queueing them to the workqueue
This commit is contained in:
Julian Brost 2025-06-17 10:17:27 +02:00 committed by GitHub
commit 1aa62d4bb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 204 additions and 317 deletions

View File

@ -101,13 +101,13 @@ void ElasticsearchWriter::Resume()
CheckResultHandler(checkable, cr);
});
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type);
const CheckResult::Ptr& cr, StateType, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr);
});
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr&,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
NotificationSentToAllUsersHandler(checkable, users, type, cr, author, text);
});
}
@ -236,15 +236,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
}
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -272,38 +263,24 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
fields->Set("reachable", checkable->IsReachable());
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "checkresult", fields, ts);
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
AddCheckResult(fields, checkable, cr);
Enqueue(checkable, "checkresult", fields, cr->GetExecutionEnd());
});
}
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
}
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -325,45 +302,24 @@ void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& check
fields->Set("last_hard_state", host->GetLastHardState());
}
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "statechange", fields, ts);
}
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
if (IsPaused())
return;
AddCheckResult(fields, checkable, cr);
m_WorkQueue.Enqueue([this, notification, checkable, users, type, cr, author, text]() {
NotificationSentToAllUsersHandlerInternal(notification, checkable, users, type, cr, author, text);
Enqueue(checkable, "statechange", fields, cr->GetExecutionEnd());
});
}
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
if (IsPaused())
return;
Host::Ptr host;
Service::Ptr service;
@ -396,27 +352,32 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notifi
fields->Set("notification_type", notificationTypeString);
fields->Set("author", author);
fields->Set("text", text);
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "notification", fields, ts);
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
Enqueue(checkable, "notification", fields, ts);
});
}
void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts)
{
AssertOnWorkQueue();
/* Atomically buffer the data point. */
std::unique_lock<std::mutex> lock(m_DataBufferMutex);

View File

@ -42,16 +42,10 @@ private:
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
void Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts);

View File

@ -94,14 +94,14 @@ void GelfWriter::Resume()
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
});
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr&,
const Checkable::Ptr& checkable, const User::Ptr&, const NotificationType& type, const CheckResult::Ptr& cr,
const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
NotificationToUserHandler(checkable, type, cr, author, commentText, commandName);
});
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type);
const CheckResult::Ptr& cr, StateType, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr);
});
}
@ -268,18 +268,6 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing check result for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -306,91 +294,74 @@ void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, con
fields->Set("_reachable", checkable->IsReachable());
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
fields->Set("_check_command", checkCommand->GetName());
if (checkCommand)
fields->Set("_check_command", checkCommand->GetName());
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
double ts = Utility::GetTime();
Log(LogDebug, "GelfWriter")
<< "Processing check result for '" << checkable->GetName() << "'";
if (cr) {
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
ts = cr->GetExecutionEnd();
}
if (cr && GetEnableSendPerfdata()) {
Array::Ptr perfdata = cr->GetPerformanceData();
if (GetEnableSendPerfdata()) {
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GelfWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GelfWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
continue;
}
}
String escaped_key = pdv->GetLabel();
boost::replace_all(escaped_key, " ", "_");
boost::replace_all(escaped_key, ".", "_");
boost::replace_all(escaped_key, "\\", "_");
boost::algorithm::replace_all(escaped_key, "::", ".");
fields->Set("_" + escaped_key, pdv->GetValue());
if (!pdv->GetMin().IsEmpty())
fields->Set("_" + escaped_key + "_min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("_" + escaped_key + "_max", pdv->GetMax());
if (!pdv->GetWarn().IsEmpty())
fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
if (!pdv->GetCrit().IsEmpty())
fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
if (!pdv->GetUnit().IsEmpty())
fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
}
String escaped_key = pdv->GetLabel();
boost::replace_all(escaped_key, " ", "_");
boost::replace_all(escaped_key, ".", "_");
boost::replace_all(escaped_key, "\\", "_");
boost::algorithm::replace_all(escaped_key, "::", ".");
fields->Set("_" + escaped_key, pdv->GetValue());
if (!pdv->GetMin().IsEmpty())
fields->Set("_" + escaped_key + "_min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("_" + escaped_key + "_max", pdv->GetMax());
if (!pdv->GetWarn().IsEmpty())
fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
if (!pdv->GetCrit().IsEmpty())
fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
if (!pdv->GetUnit().IsEmpty())
fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
}
}
}
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
}
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
});
}
void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
if (IsPaused())
return;
Host::Ptr host;
Service::Ptr service;
@ -430,32 +401,23 @@ void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& noti
fields->Set("_command", commandName);
fields->Set("_notification_type", notificationTypeString);
fields->Set("_comment", authorComment);
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
if (commandObj)
fields->Set("_check_command", commandObj->GetName());
Log(LogDebug, "GelfWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
});
}
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
}
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing state change for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -478,21 +440,19 @@ void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, con
fields->Set("_last_hard_state", host->GetLastHardState());
}
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
if (commandObj)
fields->Set("_check_command", commandObj->GetName());
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
double ts = Utility::GetTime();
Log(LogDebug, "GelfWriter")
<< "Processing state change for '" << checkable->GetName() << "'";
if (cr) {
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
ts = cr->GetExecutionEnd();
}
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
});
}
String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
@ -506,6 +466,8 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
{
AssertOnWorkQueue();
std::ostringstream msgbuf;
msgbuf << gelfMessage;
msgbuf << '\0';

View File

@ -40,15 +40,9 @@ private:
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
const String& author, const String& commentText, const String& commandName);
void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
const String& author, const String& comment_text, const String& command_name);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);

View File

@ -261,27 +261,6 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
/**
* Check result event handler, prepares metadata and perfdata values and calls Send*()
*
* Called inside the WQ.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
/* TODO: Deal with missing connection here. Needs refactoring
* into parsing the actual performance data and then putting it
* into a queue for re-inserting. */
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -306,29 +285,34 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
});
}
String prefixPerfdata = prefix + ".perfdata";
String prefixMetadata = prefix + ".metadata";
double ts = cr->GetExecutionEnd();
std::vector<std::pair<String, double>> metadata;
if (GetEnableSendMetadata()) {
if (service) {
SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
} else {
SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
}
SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
metadata = {
{"state", service ? service->GetState() : host->GetState()},
{"current_attempt", checkable->GetCheckAttempt()},
{"max_check_attempts", checkable->GetMaxCheckAttempts()},
{"state_type", checkable->GetStateType()},
{"reachable", checkable->IsReachable()},
{"downtime_depth", checkable->GetDowntimeDepth()},
{"acknowledgement", checkable->GetAcknowledgement()},
{"latency", cr->CalculateLatency()},
{"execution_time", cr->CalculateExecutionTime()}
};
}
SendPerfdata(checkable, prefixPerfdata, cr, ts);
m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() {
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
/* TODO: Deal with missing connection here. Needs refactoring
* into parsing the actual performance data and then putting it
* into a queue for re-inserting. */
for (auto& [name, val] : metadata) {
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
}
SendPerfdata(checkable, prefix + ".perfdata", cr);
});
}
/**
@ -337,10 +321,11 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
* @param checkable Host/service object
* @param prefix Metric prefix string
* @param cr Check result including performance data
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
Array::Ptr perfdata = cr->GetPerformanceData();
if (!perfdata)
@ -367,6 +352,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
}
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
double ts = cr->GetExecutionEnd();
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
@ -394,6 +380,8 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
AssertOnWorkQueue();
namespace asio = boost::asio;
std::ostringstream msgbuf;

View File

@ -45,9 +45,8 @@ private:
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
static String EscapeMetric(const String& str);
static String EscapeMetricLabel(const String& str);
static Value EscapeMacroMetric(const Value& value);

View File

@ -204,15 +204,6 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
}
void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -225,10 +216,6 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
resolvers.emplace_back("service", service);
resolvers.emplace_back("host", host);
String prefix;
double ts = cr->GetExecutionEnd();
// Clone the template and perform an in-place macro expansion of measurement and tag values
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone());
@ -253,56 +240,9 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
tmpl->Set("tags", tags);
}
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
Dictionary::Ptr fields;
if (GetEnableSendMetadata()) {
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
fields = new Dictionary();
if (service)
fields->Set("state", new InfluxdbInteger(service->GetState()));
@ -317,9 +257,57 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
fields->Set("latency", cr->CalculateLatency());
fields->Set("execution_time", cr->CalculateExecutionTime());
SendMetric(checkable, tmpl, Empty, fields, ts);
}
m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() {
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
double ts = cr->GetExecutionEnd();
if (Array::Ptr perfdata = cr->GetPerformanceData()) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
if (metadataFields) {
SendMetric(checkable, tmpl, Empty, metadataFields, ts);
}
}, PriorityLow);
}
String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str)
@ -364,6 +352,8 @@ String InfluxdbCommonWriter::EscapeValue(const Value& value)
void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts)
{
AssertOnWorkQueue();
std::ostringstream msgbuf;
msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));

View File

@ -54,7 +54,6 @@ private:
std::atomic_size_t m_DataBufferSize{0};
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout();