icinga2/lib/perfdata/graphitewriter.cpp
Alexander A. Klimov ba7102cae3 Explicitly stop started timers and wait for them
before permitting their parent objects' destruction.
For the cases where the handlers have raw pointers to these objects.
2023-04-14 14:52:04 +02:00

515 lines
14 KiB
C++

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "perfdata/graphitewriter.hpp"
#include "perfdata/graphitewriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <utility>
using namespace icinga;
REGISTER_TYPE(GraphiteWriter);
REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
/*
* Enable HA capabilities once the config object is loaded.
*/
void GraphiteWriter::OnConfigLoaded()
{
ObjectImpl<GraphiteWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GraphiteWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "GraphiteWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
* @param perfdata Array of PerfdataValue objects
*/
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "connected", graphitewriter->GetConnected() }
}));
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("graphitewriter", new Dictionary(std::move(nodes)));
}
/**
* Resume is equivalent to Start, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Resume()
{
ObjectImpl<GraphiteWriter>::Resume();
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Timer for reconnecting */
m_ReconnectTimer = Timer::Create();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
/* Register event handlers. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
});
}
/**
* Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Pause()
{
m_HandleCheckResults.disconnect();
m_ReconnectTimer->Stop(true);
try {
ReconnectInternal();
} catch (const std::exception&) {
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
ObjectImpl<GraphiteWriter>::Pause();
return;
}
m_WorkQueue.Join();
DisconnectInternal();
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' paused.";
ObjectImpl<GraphiteWriter>::Pause();
}
/**
* Check if method is called inside the WQ thread.
*/
void GraphiteWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
/**
* Exception handler for the WQ.
*
* Closes the connection if connected.
*
* @param exp Exception pointer
*/
void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
Log(LogDebug, "GraphiteWriter")
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) {
m_Stream->close();
SetConnected(false);
}
}
/**
* Reconnect method, stops when the feature is paused in HA zones.
*
* Called inside the WQ.
*/
void GraphiteWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
ReconnectInternal();
}
/**
* Reconnect method, connects to a TCP Stream
*/
void GraphiteWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
SetShouldConnect(true);
if (GetConnected())
return;
Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
SetConnected(false);
throw;
}
SetConnected(true);
Log(LogInformation, "GraphiteWriter")
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
/**
* Reconnect handler called by the timer.
*
* Enqueues a reconnect task into the WQ.
*/
void GraphiteWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
}
/**
* Disconnect the stream.
*
* Called inside the WQ.
*/
void GraphiteWriter::Disconnect()
{
AssertOnWorkQueue();
DisconnectInternal();
}
/**
* Disconnect the stream.
*
* Called outside the WQ.
*/
void GraphiteWriter::DisconnectInternal()
{
if (!GetConnected())
return;
m_Stream->close();
SetConnected(false);
}
/**
* Check result event handler, checks whether feature is not paused in HA setups.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
/**
* Check result event handler, prepares metadata and perfdata values and calls Send*()
*
* Called inside the WQ.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
/* TODO: Deal with missing connection here. Needs refactoring
* into parsing the actual performance data and then putting it
* into a queue for re-inserting. */
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
MacroProcessor::ResolverList resolvers;
if (service)
resolvers.emplace_back("service", service);
resolvers.emplace_back("host", host);
String prefix;
if (service) {
prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
return EscapeMacroMetric(value);
});
} else {
prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
return EscapeMacroMetric(value);
});
}
String prefixPerfdata = prefix + ".perfdata";
String prefixMetadata = prefix + ".metadata";
double ts = cr->GetExecutionEnd();
if (GetEnableSendMetadata()) {
if (service) {
SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
} else {
SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
}
SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
}
SendPerfdata(checkable, prefixPerfdata, cr, ts);
}
/**
* Parse performance data from check result and call SendMetric()
*
* @param checkable Host/service object
* @param prefix Metric prefix string
* @param cr Check result including performance data
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
if (!perfdata)
return;
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GraphiteWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
if (!pdv->GetWarn().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
if (!pdv->GetMin().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
if (!pdv->GetMax().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
}
}
}
/**
* Computes metric data and sends to Graphite
*
* @param checkable Host/service object
* @param prefix Computed metric prefix string
* @param name Metric name
* @param value Metric value
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
namespace asio = boost::asio;
std::ostringstream msgbuf;
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
Log(LogDebug, "GraphiteWriter")
<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
// do not send \n to debug log
msgbuf << "\n";
std::unique_lock<std::mutex> lock(m_StreamMutex);
if (!GetConnected())
return;
try {
asio::write(*m_Stream, asio::buffer(msgbuf.str()));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
}
/**
* Escape metric tree elements
*
* Dots are not allowed, e.g. in host names
*
* @param str Metric part name
* @return Escape string
*/
String GraphiteWriter::EscapeMetric(const String& str)
{
String result = str;
//don't allow '.' in metric prefixes
boost::replace_all(result, " ", "_");
boost::replace_all(result, ".", "_");
boost::replace_all(result, "\\", "_");
boost::replace_all(result, "/", "_");
return result;
}
/**
* Escape metric label
*
* Dots are allowed - users can create trees from perfdata labels
*
* @param str Metric label name
* @return Escaped string
*/
String GraphiteWriter::EscapeMetricLabel(const String& str)
{
String result = str;
//allow to pass '.' in perfdata labels
boost::replace_all(result, " ", "_");
boost::replace_all(result, "\\", "_");
boost::replace_all(result, "/", "_");
boost::replace_all(result, "::", ".");
return result;
}
/**
* Escape macro metrics found via host/service name templates
*
* @param value Array or string with macro metric names
* @return Escaped string. Arrays are joined with dots.
*/
Value GraphiteWriter::EscapeMacroMetric(const Value& value)
{
if (value.IsObjectType<Array>()) {
Array::Ptr arr = value;
ArrayData result;
ObjectLock olock(arr);
for (const Value& arg : arr) {
result.push_back(EscapeMetric(arg));
}
return Utility::Join(new Array(std::move(result)), '.');
} else
return EscapeMetric(value);
}
/**
* Validate the configuration setting 'host_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
if (!MacroProcessor::ValidateMacroString(lvalue()))
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
}
/**
* Validate the configuration setting 'service_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
if (!MacroProcessor::ValidateMacroString(lvalue()))
BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
}