Serialize fields before queueing the event to the workqueue

This commit is contained in:
Yonas Habteab 2025-04-25 12:17:19 +02:00
parent a589b87d6c
commit cef6fb77e5
8 changed files with 191 additions and 253 deletions

View File

@ -236,15 +236,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
}
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -274,10 +265,15 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
fields->Set("reachable", checkable->IsReachable());
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
AddCheckResult(fields, checkable, cr);
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "checkresult", fields, cr->GetExecutionEnd());
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
AddCheckResult(fields, checkable, cr);
Enqueue(checkable, "checkresult", fields, cr->GetExecutionEnd());
});
}
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -285,15 +281,6 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { StateChangeHandlerInternal(checkable, cr); });
}
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -317,10 +304,15 @@ void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& check
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
AddCheckResult(fields, checkable, cr);
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "statechange", fields, cr->GetExecutionEnd());
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
AddCheckResult(fields, checkable, cr);
Enqueue(checkable, "statechange", fields, cr->GetExecutionEnd());
});
}
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
@ -329,21 +321,6 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, users, type, cr, author, text]() {
NotificationSentToAllUsersHandlerInternal(checkable, users, type, cr, author, text);
});
}
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -377,21 +354,30 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Checka
fields->Set("text", text);
fields->Set("check_command", checkable->GetCheckCommand()->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
AddTemplateTags(fields, checkable, cr);
Enqueue(checkable, "notification", fields, ts);
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
Enqueue(checkable, "notification", fields, ts);
});
}
void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts)
{
AssertOnWorkQueue();
/* Atomically buffer the data point. */
std::unique_lock<std::mutex> lock(m_DataBufferMutex);

View File

@ -43,13 +43,9 @@ private:
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandlerInternal(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
void Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts);

View File

@ -268,18 +268,6 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing check result for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -308,80 +296,72 @@ void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, con
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
fields->Set("_check_command", checkCommand->GetName());
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
if (GetEnableSendPerfdata()) {
Array::Ptr perfdata = cr->GetPerformanceData();
Log(LogDebug, "GelfWriter")
<< "Processing check result for '" << checkable->GetName() << "'";
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GelfWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
if (GetEnableSendPerfdata()) {
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GelfWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
continue;
}
}
String escaped_key = pdv->GetLabel();
boost::replace_all(escaped_key, " ", "_");
boost::replace_all(escaped_key, ".", "_");
boost::replace_all(escaped_key, "\\", "_");
boost::algorithm::replace_all(escaped_key, "::", ".");
fields->Set("_" + escaped_key, pdv->GetValue());
if (!pdv->GetMin().IsEmpty())
fields->Set("_" + escaped_key + "_min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("_" + escaped_key + "_max", pdv->GetMax());
if (!pdv->GetWarn().IsEmpty())
fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
if (!pdv->GetCrit().IsEmpty())
fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
if (!pdv->GetUnit().IsEmpty())
fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
}
String escaped_key = pdv->GetLabel();
boost::replace_all(escaped_key, " ", "_");
boost::replace_all(escaped_key, ".", "_");
boost::replace_all(escaped_key, "\\", "_");
boost::algorithm::replace_all(escaped_key, "::", ".");
fields->Set("_" + escaped_key, pdv->GetValue());
if (!pdv->GetMin().IsEmpty())
fields->Set("_" + escaped_key + "_min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("_" + escaped_key + "_max", pdv->GetMax());
if (!pdv->GetWarn().IsEmpty())
fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
if (!pdv->GetCrit().IsEmpty())
fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
if (!pdv->GetUnit().IsEmpty())
fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
}
}
}
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
}
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, notificationType, cr, author, commentText, commandName]() {
NotificationToUserHandlerInternal(checkable, notificationType, cr, author, commentText, commandName);
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
});
}
void GelfWriter::NotificationToUserHandlerInternal(const Checkable::Ptr& checkable, NotificationType notificationType,
CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName)
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
if (IsPaused())
return;
Host::Ptr host;
Service::Ptr service;
@ -423,7 +403,14 @@ void GelfWriter::NotificationToUserHandlerInternal(const Checkable::Ptr& checkab
fields->Set("_comment", authorComment);
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
});
}
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -431,18 +418,6 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { StateChangeHandlerInternal(checkable, cr); });
}
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing state change for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
@ -470,7 +445,14 @@ void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, con
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing state change for '" << checkable->GetName() << "'";
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
});
}
String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
@ -484,6 +466,8 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
{
AssertOnWorkQueue();
std::ostringstream msgbuf;
msgbuf << gelfMessage;
msgbuf << '\0';

View File

@ -40,13 +40,9 @@ private:
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName);
void NotificationToUserHandlerInternal(const Checkable::Ptr& checkable, NotificationType notification_type,
const CheckResult::Ptr& cr, const String& author, const String& comment_text, const String& command_name);
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
const String& author, const String& commentText, const String& commandName);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);

View File

@ -261,27 +261,6 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
/**
* Check result event handler, prepares metadata and perfdata values and calls Send*()
*
* Called inside the WQ.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
/* TODO: Deal with missing connection here. Needs refactoring
* into parsing the actual performance data and then putting it
* into a queue for re-inserting. */
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -306,29 +285,34 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
});
}
String prefixPerfdata = prefix + ".perfdata";
String prefixMetadata = prefix + ".metadata";
double ts = cr->GetExecutionEnd();
std::vector<std::pair<String, double>> metadata;
if (GetEnableSendMetadata()) {
if (service) {
SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
} else {
SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
}
SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
metadata = {
{"state", service ? service->GetState() : host->GetState()},
{"current_attempt", checkable->GetCheckAttempt()},
{"max_check_attempts", checkable->GetMaxCheckAttempts()},
{"state_type", checkable->GetStateType()},
{"reachable", checkable->IsReachable()},
{"downtime_depth", checkable->GetDowntimeDepth()},
{"acknowledgement", checkable->GetAcknowledgement()},
{"latency", cr->CalculateLatency()},
{"execution_time", cr->CalculateExecutionTime()}
};
}
SendPerfdata(checkable, prefixPerfdata, cr, ts);
m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() {
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
/* TODO: Deal with missing connection here. Needs refactoring
* into parsing the actual performance data and then putting it
* into a queue for re-inserting. */
for (auto& [name, val] : metadata) {
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
}
SendPerfdata(checkable, prefix + ".perfdata", cr);
});
}
/**
@ -337,10 +321,11 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
* @param checkable Host/service object
* @param prefix Metric prefix string
* @param cr Check result including performance data
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
Array::Ptr perfdata = cr->GetPerformanceData();
if (!perfdata)
@ -367,6 +352,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
}
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
double ts = cr->GetExecutionEnd();
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
@ -394,6 +380,8 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
AssertOnWorkQueue();
namespace asio = boost::asio;
std::ostringstream msgbuf;

View File

@ -45,9 +45,8 @@ private:
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
static String EscapeMetric(const String& str);
static String EscapeMetricLabel(const String& str);
static Value EscapeMacroMetric(const Value& value);

View File

@ -204,15 +204,6 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
}
void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
@ -225,10 +216,6 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
resolvers.emplace_back("service", service);
resolvers.emplace_back("host", host);
String prefix;
double ts = cr->GetExecutionEnd();
// Clone the template and perform an in-place macro expansion of measurement and tag values
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone());
@ -253,56 +240,9 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
tmpl->Set("tags", tags);
}
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
Dictionary::Ptr fields;
if (GetEnableSendMetadata()) {
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
fields = new Dictionary();
if (service)
fields->Set("state", new InfluxdbInteger(service->GetState()));
@ -317,9 +257,57 @@ void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable,
fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
fields->Set("latency", cr->CalculateLatency());
fields->Set("execution_time", cr->CalculateExecutionTime());
SendMetric(checkable, tmpl, Empty, fields, ts);
}
m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() {
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
double ts = cr->GetExecutionEnd();
if (Array::Ptr perfdata = cr->GetPerformanceData()) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
if (metadataFields) {
SendMetric(checkable, tmpl, Empty, metadataFields, ts);
}
}, PriorityLow);
}
String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str)
@ -364,6 +352,8 @@ String InfluxdbCommonWriter::EscapeValue(const Value& value)
void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts)
{
AssertOnWorkQueue();
std::ostringstream msgbuf;
msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));

View File

@ -54,7 +54,6 @@ private:
std::atomic_size_t m_DataBufferSize{0};
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout();