Merge pull request #8719 from Icinga/feature/influxdb-2-8711

Introduce Influxdb2Writer
This commit is contained in:
Alexander Aleksandrovič Klimov 2021-07-21 17:59:58 +02:00 committed by GitHub
commit d073d2268e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1015 additions and 699 deletions

View File

@ -3001,7 +3001,7 @@ By default, the following features provide advanced HA functionality:
* [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter) * [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter)
* [Gelf](09-object-types.md#objecttype-gelfwriter) * [Gelf](09-object-types.md#objecttype-gelfwriter)
* [Graphite](09-object-types.md#objecttype-graphitewriter) * [Graphite](09-object-types.md#objecttype-graphitewriter)
* [InfluxDB](09-object-types.md#objecttype-influxdbwriter) * [InfluxDB](09-object-types.md#objecttype-influxdb2writer) (v1 and v2)
* [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter) * [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter)
* [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP) * [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP)

View File

@ -1604,8 +1604,9 @@ Runtime Attributes:
### InfluxdbWriter <a id="objecttype-influxdbwriter"></a> ### InfluxdbWriter <a id="objecttype-influxdbwriter"></a>
Writes check result metrics and performance data to a defined InfluxDB host. Writes check result metrics and performance data to a defined InfluxDB v1 host.
This configuration object is available as [influxdb feature](14-features.md#influxdb-writer). This configuration object is available as [influxdb feature](14-features.md#influxdb-writer).
For InfluxDB v2 support see the [Influxdb2Writer](#objecttype-influxdb2writer) below.
Example: Example:
@ -1669,6 +1670,68 @@ or similar.
### Influxdb2Writer <a id="objecttype-influxdb2writer"></a>
Writes check result metrics and performance data to a defined InfluxDB v2 host.
This configuration object is available as [influxdb feature](14-features.md#influxdb-writer).
For InfluxDB v1 support see the [InfluxdbWriter](#objecttype-influxdbwriter) above.
Example:
```
object Influxdb2Writer "influxdb2" {
host = "127.0.0.1"
port = 8086
organization = "monitoring"
bucket = "icinga2"
auth_token = "ABCDEvwxyz0189-_"
flush_threshold = 1024
flush_interval = 10s
host_template = {
measurement = "$host.check_command$"
tags = {
hostname = "$host.name$"
}
}
service_template = {
measurement = "$service.check_command$"
tags = {
hostname = "$host.name$"
service = "$service.name$"
}
}
}
```
Configuration Attributes:
Name | Type | Description
--------------------------|-----------------------|----------------------------------
host | String | **Required.** InfluxDB host address. Defaults to `127.0.0.1`.
port | Number | **Required.** InfluxDB HTTP port. Defaults to `8086`.
organization | String | **Required.** InfluxDB organization name.
bucket | String | **Required.** InfluxDB bucket name.
auth\_token | String | **Required.** InfluxDB authentication token.
ssl\_enable | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
ssl\_ca\_cert | String | **Optional.** Path to CA certificate to validate the remote host.
ssl\_cert | String | **Optional.** Path to host certificate to present to the remote host for mutual verification.
ssl\_key | String | **Optional.** Path to host key to accompany the ssl\_cert.
host\_template | Dictionary | **Required.** Host template to define the InfluxDB line protocol.
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
or similar.
### LiveStatusListener <a id="objecttype-livestatuslistener"></a> ### LiveStatusListener <a id="objecttype-livestatuslistener"></a>
Livestatus API interface available as TCP or UNIX socket. Historical table queries Livestatus API interface available as TCP or UNIX socket. Historical table queries

View File

@ -57,12 +57,19 @@ Integration in Icinga Web 2 is possible by installing the official [graphite mod
Its written in Go and has no external dependencies. Its written in Go and has no external dependencies.
Use the [InfluxdbWriter](14-features.md#influxdb-writer) feature Use the [InfluxdbWriter](14-features.md#influxdb-writer) feature
for sending real-time metrics from Icinga 2 to InfluxDB. for sending real-time metrics from Icinga 2 to InfluxDB v1.
```bash ```bash
icinga2 feature enable influxdb icinga2 feature enable influxdb
``` ```
Use the [Influxdb2Writer](14-features.md#influxdb-writer) feature
for sending real-time metrics from Icinga 2 to InfluxDB v2.
```bash
icinga2 feature enable influxdb2
```
A popular frontend for InfluxDB is for example [Grafana](https://grafana.org). A popular frontend for InfluxDB is for example [Grafana](https://grafana.org).
Integration in Icinga Web 2 is possible by installing the community [Grafana module](https://github.com/Mikesch-mp/icingaweb2-module-grafana). Integration in Icinga Web 2 is possible by installing the community [Grafana module](https://github.com/Mikesch-mp/icingaweb2-module-grafana).

View File

@ -369,7 +369,7 @@ where Carbon Cache/Relay is running as receiver.
### InfluxDB Writer <a id="influxdb-writer"></a> ### InfluxDB Writer <a id="influxdb-writer"></a>
Once there are new metrics available, Icinga 2 will directly write them to the Once there are new metrics available, Icinga 2 will directly write them to the
defined InfluxDB HTTP API. defined InfluxDB v1/v2 HTTP API.
You can enable the feature using You can enable the feature using
@ -377,10 +377,17 @@ You can enable the feature using
icinga2 feature enable influxdb icinga2 feature enable influxdb
``` ```
By default the [InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter) feature or
expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
Measurement names and tags are fully configurable by the end user. The InfluxdbWriter ```bash
icinga2 feature enable influxdb2
```
By default the
[InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter)/[Influxdb2Writer](09-object-types.md#objecttype-influxdb2writer)
features expect the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
Measurement names and tags are fully configurable by the end user. The Influxdb(2)Writer
object will automatically add a `metric` tag to each data point. This correlates to the object will automatically add a `metric` tag to each data point. This correlates to the
perfdata label. Fields (value, warn, crit, min, max, unit) are created from data if available perfdata label. Fields (value, warn, crit, min, max, unit) are created from data if available
and the configuration allows it. If a value associated with a tag is not able to be and the configuration allows it. If a value associated with a tag is not able to be
@ -391,12 +398,13 @@ escape characters when followed by a space or comma, but cannot be escaped thems
As a result all trailling slashes in these fields are replaced with an underscore. This As a result all trailling slashes in these fields are replaced with an underscore. This
predominantly affects Windows paths e.g. `C:\` becomes `C:_`. predominantly affects Windows paths e.g. `C:\` becomes `C:_`.
The database is assumed to exist so this object will make no attempt to create it currently. The database/bucket is assumed to exist so this object will make no attempt to create it currently.
If [SELinux](22-selinux.md#selinux) is enabled, it will not allow access for Icinga 2 to InfluxDB until the [boolean](22-selinux.md#selinux-policy-booleans) If [SELinux](22-selinux.md#selinux) is enabled, it will not allow access for Icinga 2 to InfluxDB until the [boolean](22-selinux.md#selinux-policy-booleans)
`icinga2_can_connect_all` is set to true as InfluxDB is not providing its own policy. `icinga2_can_connect_all` is set to true as InfluxDB is not providing its own policy.
More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter). More configuration details can be found [here for v1](09-object-types.md#objecttype-influxdbwriter)
and [here for v2](09-object-types.md#objecttype-influxdb2writer).
#### Instance Tagging <a id="influxdb-writer-instance-tags"></a> #### Instance Tagging <a id="influxdb-writer-instance-tags"></a>

View File

@ -1,6 +1,6 @@
/** /**
* The InfluxdbWriter type writes check result metrics and * The InfluxdbWriter type writes check result metrics and
* performance data to an InfluxDB HTTP API * performance data to an InfluxDB v1 HTTP API
*/ */
object InfluxdbWriter "influxdb" { object InfluxdbWriter "influxdb" {

View File

@ -0,0 +1,27 @@
/**
* The Influxdb2Writer type writes check result metrics and
* performance data to an InfluxDB v2 HTTP API
*/
object Influxdb2Writer "influxdb2" {
//host = "127.0.0.1"
//port = 8086
//organization = "monitoring"
//bucket = "icinga2"
//auth_token = "ABCDEvwxyz0189-_"
//flush_threshold = 1024
//flush_interval = 10s
//host_template = {
// measurement = "$host.check_command$"
// tags = {
// hostname = "$host.name$"
// }
//}
//service_template = {
// measurement = "$service.check_command$"
// tags = {
// hostname = "$host.name$"
// service = "$service.name$"
// }
//}
}

View File

@ -2,7 +2,9 @@
mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp) mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp)
mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp) mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp)
mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp)
mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp) mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp)
mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp)
mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp) mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp)
mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp) mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp)
mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp) mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp)
@ -11,7 +13,9 @@ set(perfdata_SOURCES
elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp
gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp
graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp
influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp
influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
) )
@ -44,6 +48,11 @@ install_if_not_exists(
${ICINGA2_CONFIGDIR}/features-available ${ICINGA2_CONFIGDIR}/features-available
) )
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb2.conf
${ICINGA2_CONFIGDIR}/features-available
)
install_if_not_exists( install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf
${ICINGA2_CONFIGDIR}/features-available ${ICINGA2_CONFIGDIR}/features-available

View File

@ -0,0 +1,44 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#include "perfdata/influxdb2writer.hpp"
#include "perfdata/influxdb2writer-ti.cpp"
#include "remote/url.hpp"
#include "base/configtype.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
#include <utility>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
using namespace icinga;
REGISTER_TYPE(Influxdb2Writer);
REGISTER_STATSFUNCTION(Influxdb2Writer, &Influxdb2Writer::StatsFunc);
void Influxdb2Writer::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
InfluxdbCommonWriter::StatsFunc<Influxdb2Writer>(status, perfdata);
}
boost::beast::http::request<boost::beast::http::string_body> Influxdb2Writer::AssembleRequest(String body)
{
auto request (AssembleBaseRequest(std::move(body)));
request.set(boost::beast::http::field::authorization, "Token " + GetAuthToken());
return std::move(request);
}
Url::Ptr Influxdb2Writer::AssembleUrl()
{
auto url (AssembleBaseUrl());
std::vector<String> path ({"api", "v2", "write"});
url->SetPath(path);
url->AddQueryElement("org", GetOrganization());
url->AddQueryElement("bucket", GetBucket());
return std::move(url);
}

View File

@ -0,0 +1,33 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#ifndef INFLUXDB2WRITER_H
#define INFLUXDB2WRITER_H
#include "perfdata/influxdb2writer-ti.hpp"
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
namespace icinga
{
/**
* An Icinga InfluxDB v2 writer.
*
* @ingroup perfdata
*/
class Influxdb2Writer final : public ObjectImpl<Influxdb2Writer>
{
public:
DECLARE_OBJECT(Influxdb2Writer);
DECLARE_OBJECTNAME(Influxdb2Writer);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
protected:
boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override;
Url::Ptr AssembleUrl() override;
};
}
#endif /* INFLUXDB2WRITER_H */

View File

@ -0,0 +1,19 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#include "perfdata/influxdbcommonwriter.hpp"
library perfdata;
namespace icinga
{
class Influxdb2Writer : InfluxdbCommonWriter
{
activation_priority 100;
[config, required] String organization;
[config, required] String bucket;
[config, required, no_user_view] String auth_token;
};
}

View File

@ -0,0 +1,586 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#include "perfdata/influxdbcommonwriter.hpp"
#include "perfdata/influxdbcommonwriter-ti.cpp"
#include "remote/url.hpp"
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/stream.hpp"
#include "base/json.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include "base/tlsutility.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility>
using namespace icinga;
REGISTER_TYPE(InfluxdbCommonWriter);
class InfluxdbInteger final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
InfluxdbInteger(int value)
: m_Value(value)
{ }
int GetValue() const
{
return m_Value;
}
private:
int m_Value;
};
void InfluxdbCommonWriter::OnConfigLoaded()
{
ObjectImpl<InfluxdbCommonWriter>::OnConfigLoaded();
m_WorkQueue.SetName(GetReflectionType()->GetName() + ", " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, GetReflectionType()->GetName())
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void InfluxdbCommonWriter::Resume()
{
ObjectImpl<InfluxdbCommonWriter>::Resume();
Log(LogInformation, GetReflectionType()->GetName())
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Register for new metrics. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
});
}
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
void InfluxdbCommonWriter::Pause()
{
/* Force a flush. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing pending data buffers.";
Flush();
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Joining existing WQ tasks.";
m_WorkQueue.Join();
/* Flush again after the WQ tasks have filled the data buffer. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing data buffers from WQ tasks.";
Flush();
Log(LogInformation, GetReflectionType()->GetName())
<< "'" << GetName() << "' paused.";
ObjectImpl<InfluxdbCommonWriter>::Pause();
}
void InfluxdbCommonWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, GetReflectionType()->GetName(), "Exception during InfluxDB operation: Verify that your backend is operational!");
Log(LogDebug, GetReflectionType()->GetName())
<< "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
//TODO: Close the connection, if we keep it open.
}
OptionalTlsStream InfluxdbCommonWriter::Connect()
{
Log(LogNotice, GetReflectionType()->GetName())
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
OptionalTlsStream stream;
bool ssl = GetSslEnable();
if (ssl) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to create SSL context.";
throw;
}
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (ssl) {
auto& tlsStream (stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "TLS handshake with host '" << GetHost() << "' failed.";
throw;
}
}
return std::move(stream);
}
void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
}
void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
MacroProcessor::ResolverList resolvers;
if (service)
resolvers.emplace_back("service", service);
resolvers.emplace_back("host", host);
resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
String prefix;
double ts = cr->GetExecutionEnd();
// Clone the template and perform an in-place macro expansion of measurement and tag values
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone());
tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
Dictionary::Ptr tagsClean = tmpl->Get("tags");
if (tagsClean) {
Dictionary::Ptr tags = new Dictionary();
{
ObjectLock olock(tagsClean);
for (const Dictionary::Pair& pair : tagsClean) {
String missing_macro;
Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
if (missing_macro.IsEmpty()) {
tags->Set(pair.first, value);
}
}
}
tmpl->Set("tags", tags);
}
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
if (GetEnableSendMetadata()) {
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
if (service)
fields->Set("state", new InfluxdbInteger(service->GetState()));
else
fields->Set("state", new InfluxdbInteger(host->GetState()));
fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
fields->Set("reachable", checkable->IsReachable());
fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
fields->Set("latency", cr->CalculateLatency());
fields->Set("execution_time", cr->CalculateExecutionTime());
SendMetric(checkable, tmpl, Empty, fields, ts);
}
}
String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str)
{
// Iterate over the key name and escape commas and spaces with a backslash
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
boost::algorithm::replace_all(result, "=", "\\=");
boost::algorithm::replace_all(result, ",", "\\,");
boost::algorithm::replace_all(result, " ", "\\ ");
// InfluxDB 'feature': although backslashes are allowed in keys they also act
// as escape sequences when followed by ',' or ' '. When your tag is like
// 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
// and through experimentation they also escape '='. To be safe we replace
// trailing backslashes with and underscore.
// See https://github.com/influxdata/influxdb/issues/8587 for more info
size_t length = result.GetLength();
if (result[length - 1] == '\\')
result[length - 1] = '_';
return result;
}
String InfluxdbCommonWriter::EscapeValue(const Value& value)
{
if (value.IsObjectType<InfluxdbInteger>()) {
std::ostringstream os;
os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue() << "i";
return os.str();
}
if (value.IsBoolean())
return value ? "true" : "false";
if (value.IsString())
return "\"" + EscapeKeyOrTagValue(value) + "\"";
return value;
}
void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts)
{
std::ostringstream msgbuf;
msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
Dictionary::Ptr tags = tmpl->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
// Empty macro expansion, no tag
if (!pair.second.IsEmpty()) {
msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
}
}
}
// Label may be empty in the case of metadata
if (!label.IsEmpty())
msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
msgbuf << " ";
{
bool first = true;
ObjectLock fieldLock(fields);
for (const Dictionary::Pair& pair : fields) {
if (first)
first = false;
else
msgbuf << ",";
msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
}
}
msgbuf << " " << static_cast<unsigned long>(ts);
Log(LogDebug, GetReflectionType()->GetName())
<< "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'.";
// Buffer the data point
m_DataBuffer.emplace_back(msgbuf.str());
// Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
Log(LogDebug, GetReflectionType()->GetName())
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
try {
Flush();
} catch (...) {
/* Do nothing. */
}
}
}
void InfluxdbCommonWriter::FlushTimeout()
{
m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
}
void InfluxdbCommonWriter::FlushTimeoutWQ()
{
AssertOnWorkQueue();
Log(LogDebug, GetReflectionType()->GetName())
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
void InfluxdbCommonWriter::Flush()
{
namespace beast = boost::beast;
namespace http = beast::http;
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
if (m_DataBuffer.empty())
return;
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing data buffer to InfluxDB.";
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
OptionalTlsStream stream;
try {
stream = Connect();
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxDbWriter")
<< "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
return;
}
Defer s ([&stream]() {
if (stream.first) {
stream.first->next_layer().shutdown();
}
});
auto request (AssembleRequest(std::move(body)));
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;
try {
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
}
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
throw;
}
auto& response (parser.get());
if (response.result() != http::status::no_content) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unexpected response code: " << response.result();
auto& contentType (response[http::field::content_type]);
if (contentType != "application/json") {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unexpected Content-Type: " << contentType;
return;
}
Dictionary::Ptr jsonResponse;
auto& body (response.body());
try {
jsonResponse = JsonDecode(body);
} catch (...) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to parse JSON response:\n" << body;
return;
}
String error = jsonResponse->Get("error");
Log(LogCritical, GetReflectionType()->GetName())
<< "InfluxDB error message:\n" << error;
}
}
boost::beast::http::request<boost::beast::http::string_body> InfluxdbCommonWriter::AssembleBaseRequest(String body)
{
namespace http = boost::beast::http;
auto url (AssembleUrl());
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
request.body() = std::move(body);
request.content_length(request.body().size());
return std::move(request);
}
Url::Ptr InfluxdbCommonWriter::AssembleBaseUrl()
{
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
url->SetHost(GetHost());
url->SetPort(GetPort());
url->AddQueryElement("precision", "s");
return std::move(url);
}
void InfluxdbCommonWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbCommonWriter>::ValidateHostTemplate(lvalue, utils);
String measurement = lvalue()->Get("measurement");
if (!MacroProcessor::ValidateMacroString(measurement))
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
Dictionary::Ptr tags = lvalue()->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
if (!MacroProcessor::ValidateMacroString(pair.second))
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
}
}
}
void InfluxdbCommonWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbCommonWriter>::ValidateServiceTemplate(lvalue, utils);
String measurement = lvalue()->Get("measurement");
if (!MacroProcessor::ValidateMacroString(measurement))
BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
Dictionary::Ptr tags = lvalue()->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
if (!MacroProcessor::ValidateMacroString(pair.second))
BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
}
}
}

View File

@ -0,0 +1,99 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#ifndef INFLUXDBCOMMONWRITER_H
#define INFLUXDBCOMMONWRITER_H
#include "perfdata/influxdbcommonwriter-ti.hpp"
#include "icinga/service.hpp"
#include "base/configobject.hpp"
#include "base/perfdatavalue.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include "remote/url.hpp"
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <fstream>
namespace icinga
{
/**
* Common base class for InfluxDB v1/v2 writers.
*
* @ingroup perfdata
*/
class InfluxdbCommonWriter : public ObjectImpl<InfluxdbCommonWriter>
{
public:
DECLARE_OBJECT(InfluxdbCommonWriter);
template<class InfluxWriter>
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
protected:
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
void OnConfigLoaded() override;
void Resume() override;
void Pause() override;
boost::beast::http::request<boost::beast::http::string_body> AssembleBaseRequest(String body);
Url::Ptr AssembleBaseUrl();
virtual boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) = 0;
virtual Url::Ptr AssembleUrl() = 0;
private:
Timer::Ptr m_FlushTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout();
void FlushTimeoutWQ();
void Flush();
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);
OptionalTlsStream Connect();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);
};
template<class InfluxWriter>
void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
auto typeName (InfluxWriter::TypeInstance->GetName().ToLower());
for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) {
size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxwriter->m_DataBuffer.size();
nodes.emplace_back(influxwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "data_buffer_items", dataBufferItems }
}));
perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems));
}
status->Set(typeName, new Dictionary(std::move(nodes)));
}
}
#endif /* INFLUXDBCOMMONWRITER_H */

View File

@ -0,0 +1,85 @@
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
#include "base/configobject.hpp"
library perfdata;
namespace icinga
{
abstract class InfluxdbCommonWriter : ConfigObject
{
[config, required] String host {
default {{{ return "127.0.0.1"; }}}
};
[config, required] String port {
default {{{ return "8086"; }}}
};
[config] bool ssl_enable {
default {{{ return false; }}}
};
[config] String ssl_ca_cert {
default {{{ return ""; }}}
};
[config] String ssl_cert {
default {{{ return ""; }}}
};
[config] String ssl_key{
default {{{ return ""; }}}
};
[config, required] Dictionary::Ptr host_template {
default {{{
return new Dictionary({
{ "measurement", "$host.check_command$" },
{ "tags", new Dictionary({
{ "hostname", "$host.name$" }
}) }
});
}}}
};
[config, required] Dictionary::Ptr service_template {
default {{{
return new Dictionary({
{ "measurement", "$service.check_command$" },
{ "tags", new Dictionary({
{ "hostname", "$host.name$" },
{ "service", "$service.name$" }
}) }
});
}}}
};
[config] bool enable_send_thresholds {
default {{{ return false; }}}
};
[config] bool enable_send_metadata {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 10; }}}
};
[config] int flush_threshold {
default {{{ return 1024; }}}
};
[config] bool enable_ha {
default {{{ return false; }}}
};
};
validator InfluxdbCommonWriter {
Dictionary host_template {
required measurement;
String measurement;
Dictionary "tags" {
String "*";
};
};
Dictionary service_template {
required measurement;
String measurement;
Dictionary "tags" {
String "*";
};
};
};
}

View File

@ -2,618 +2,55 @@
#include "perfdata/influxdbwriter.hpp" #include "perfdata/influxdbwriter.hpp"
#include "perfdata/influxdbwriter-ti.cpp" #include "perfdata/influxdbwriter-ti.cpp"
#include "remote/url.hpp"
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/stream.hpp"
#include "base/json.hpp"
#include "base/base64.hpp" #include "base/base64.hpp"
#include "base/networkstream.hpp" #include "remote/url.hpp"
#include "base/exception.hpp" #include "base/configtype.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp" #include "base/statsfunction.hpp"
#include "base/tlsutility.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp> #include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp> #include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility> #include <utility>
using namespace icinga; using namespace icinga;
class InfluxdbInteger final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
InfluxdbInteger(int value)
: m_Value(value)
{ }
int GetValue() const
{
return m_Value;
}
private:
int m_Value;
};
REGISTER_TYPE(InfluxdbWriter); REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
void InfluxdbWriter::OnConfigLoaded()
{
ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "InfluxdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{ {
DictionaryData nodes; InfluxdbCommonWriter::StatsFunc<InfluxdbWriter>(status, perfdata);
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "data_buffer_items", dataBufferItems }
}));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems));
} }
status->Set("influxdbwriter", new Dictionary(std::move(nodes))); boost::beast::http::request<boost::beast::http::string_body> InfluxdbWriter::AssembleRequest(String body)
}
void InfluxdbWriter::Resume()
{ {
ObjectImpl<InfluxdbWriter>::Resume(); auto request (AssembleBaseRequest(std::move(body)));
Dictionary::Ptr basicAuth = GetBasicAuth();
Log(LogInformation, "InfluxdbWriter") if (basicAuth) {
<< "'" << GetName() << "' resumed."; request.set(
boost::beast::http::field::authorization,
/* Register exception handler for WQ tasks. */ "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password"))
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); );
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Register for new metrics. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
});
} }
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ return std::move(request);
void InfluxdbWriter::Pause() }
Url::Ptr InfluxdbWriter::AssembleUrl()
{ {
/* Force a flush. */ auto url (AssembleBaseUrl());
Log(LogDebug, "InfluxdbWriter")
<< "Flushing pending data buffers.";
Flush();
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
Log(LogDebug, "InfluxdbWriter")
<< "Joining existing WQ tasks.";
m_WorkQueue.Join();
/* Flush again after the WQ tasks have filled the data buffer. */
Log(LogDebug, "InfluxdbWriter")
<< "Flushing data buffers from WQ tasks.";
Flush();
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' paused.";
ObjectImpl<InfluxdbWriter>::Pause();
}
void InfluxdbWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
Log(LogDebug, "InfluxdbWriter")
<< "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
//TODO: Close the connection, if we keep it open.
}
OptionalTlsStream InfluxdbWriter::Connect()
{
Log(LogNotice, "InfluxdbWriter")
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
OptionalTlsStream stream;
bool ssl = GetSslEnable();
if (ssl) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to create SSL context.";
throw;
}
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (ssl) {
auto& tlsStream (stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "TLS handshake with host '" << GetHost() << "' failed.";
throw;
}
}
return std::move(stream);
}
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
}
void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
MacroProcessor::ResolverList resolvers;
if (service)
resolvers.emplace_back("service", service);
resolvers.emplace_back("host", host);
resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
String prefix;
double ts = cr->GetExecutionEnd();
// Clone the template and perform an in-place macro expansion of measurement and tag values
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone());
tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
Dictionary::Ptr tagsClean = tmpl->Get("tags");
if (tagsClean) {
Dictionary::Ptr tags = new Dictionary();
{
ObjectLock olock(tagsClean);
for (const Dictionary::Pair& pair : tagsClean) {
String missing_macro;
Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
if (missing_macro.IsEmpty()) {
tags->Set(pair.first, value);
}
}
}
tmpl->Set("tags", tags);
}
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "InfluxdbWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
Dictionary::Ptr fields = new Dictionary();
fields->Set("value", pdv->GetValue());
if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
fields->Set("crit", pdv->GetCrit());
if (!pdv->GetWarn().IsEmpty())
fields->Set("warn", pdv->GetWarn());
if (!pdv->GetMin().IsEmpty())
fields->Set("min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("max", pdv->GetMax());
}
if (!pdv->GetUnit().IsEmpty()) {
fields->Set("unit", pdv->GetUnit());
}
SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
}
}
if (GetEnableSendMetadata()) {
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
if (service)
fields->Set("state", new InfluxdbInteger(service->GetState()));
else
fields->Set("state", new InfluxdbInteger(host->GetState()));
fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
fields->Set("reachable", checkable->IsReachable());
fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
fields->Set("latency", cr->CalculateLatency());
fields->Set("execution_time", cr->CalculateExecutionTime());
SendMetric(checkable, tmpl, Empty, fields, ts);
}
}
String InfluxdbWriter::EscapeKeyOrTagValue(const String& str)
{
// Iterate over the key name and escape commas and spaces with a backslash
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
boost::algorithm::replace_all(result, "=", "\\=");
boost::algorithm::replace_all(result, ",", "\\,");
boost::algorithm::replace_all(result, " ", "\\ ");
// InfluxDB 'feature': although backslashes are allowed in keys they also act
// as escape sequences when followed by ',' or ' '. When your tag is like
// 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
// and through experimentation they also escape '='. To be safe we replace
// trailing backslashes with and underscore.
// See https://github.com/influxdata/influxdb/issues/8587 for more info
size_t length = result.GetLength();
if (result[length - 1] == '\\')
result[length - 1] = '_';
return result;
}
String InfluxdbWriter::EscapeValue(const Value& value)
{
if (value.IsObjectType<InfluxdbInteger>()) {
std::ostringstream os;
os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue() << "i";
return os.str();
}
if (value.IsBoolean())
return value ? "true" : "false";
if (value.IsString())
return "\"" + EscapeKeyOrTagValue(value) + "\"";
return value;
}
void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts)
{
std::ostringstream msgbuf;
msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
Dictionary::Ptr tags = tmpl->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
// Empty macro expansion, no tag
if (!pair.second.IsEmpty()) {
msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
}
}
}
// Label may be empty in the case of metadata
if (!label.IsEmpty())
msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
msgbuf << " ";
{
bool first = true;
ObjectLock fieldLock(fields);
for (const Dictionary::Pair& pair : fields) {
if (first)
first = false;
else
msgbuf << ",";
msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
}
}
msgbuf << " " << static_cast<unsigned long>(ts);
Log(LogDebug, "InfluxdbWriter")
<< "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'.";
// Buffer the data point
m_DataBuffer.emplace_back(msgbuf.str());
// Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
Log(LogDebug, "InfluxdbWriter")
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
try {
Flush();
} catch (...) {
/* Do nothing. */
}
}
}
void InfluxdbWriter::FlushTimeout()
{
m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
}
void InfluxdbWriter::FlushTimeoutWQ()
{
AssertOnWorkQueue();
Log(LogDebug, "InfluxdbWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
void InfluxdbWriter::Flush()
{
namespace beast = boost::beast;
namespace http = beast::http;
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
if (m_DataBuffer.empty())
return;
Log(LogDebug, "InfluxdbWriter")
<< "Flushing data buffer to InfluxDB.";
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
OptionalTlsStream stream;
try {
stream = Connect();
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxDbWriter")
<< "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
return;
}
Defer s ([&stream]() {
if (stream.first) {
stream.first->next_layer().shutdown();
}
});
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
url->SetHost(GetHost());
url->SetPort(GetPort());
std::vector<String> path; std::vector<String> path;
path.emplace_back("write"); path.emplace_back("write");
url->SetPath(path); url->SetPath(path);
url->AddQueryElement("db", GetDatabase()); url->AddQueryElement("db", GetDatabase());
url->AddQueryElement("precision", "s");
if (!GetUsername().IsEmpty()) if (!GetUsername().IsEmpty())
url->AddQueryElement("u", GetUsername()); url->AddQueryElement("u", GetUsername());
if (!GetPassword().IsEmpty()) if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword()); url->AddQueryElement("p", GetPassword());
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10); return std::move(url);
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
{
Dictionary::Ptr basicAuth = GetBasicAuth();
if (basicAuth) {
request.set(
http::field::authorization,
"Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password"))
);
} }
}
request.body() = body;
request.content_length(request.body().size());
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;
try {
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
}
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
throw;
}
auto& response (parser.get());
if (response.result() != http::status::no_content) {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected response code: " << response.result();
auto& contentType (response[http::field::content_type]);
if (contentType != "application/json") {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}
Dictionary::Ptr jsonResponse;
auto& body (response.body());
try {
jsonResponse = JsonDecode(body);
} catch (...) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to parse JSON response:\n" << body;
return;
}
String error = jsonResponse->Get("error");
Log(LogCritical, "InfluxdbWriter")
<< "InfluxDB error message:\n" << error;
}
}
void InfluxdbWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(lvalue, utils);
String measurement = lvalue()->Get("measurement");
if (!MacroProcessor::ValidateMacroString(measurement))
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
Dictionary::Ptr tags = lvalue()->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
if (!MacroProcessor::ValidateMacroString(pair.second))
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
}
}
}
void InfluxdbWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbWriter>::ValidateServiceTemplate(lvalue, utils);
String measurement = lvalue()->Get("measurement");
if (!MacroProcessor::ValidateMacroString(measurement))
BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
Dictionary::Ptr tags = lvalue()->Get("tags");
if (tags) {
ObjectLock olock(tags);
for (const Dictionary::Pair& pair : tags) {
if (!MacroProcessor::ValidateMacroString(pair.second))
BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
}
}
}

View File

@ -4,19 +4,12 @@
#define INFLUXDBWRITER_H #define INFLUXDBWRITER_H
#include "perfdata/influxdbwriter-ti.hpp" #include "perfdata/influxdbwriter-ti.hpp"
#include "icinga/service.hpp"
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include <fstream>
namespace icinga namespace icinga
{ {
/** /**
* An Icinga InfluxDB writer. * An Icinga InfluxDB v1 writer.
* *
* @ingroup perfdata * @ingroup perfdata
*/ */
@ -28,35 +21,9 @@ public:
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
protected: protected:
void OnConfigLoaded() override; boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override;
void Resume() override; Url::Ptr AssembleUrl() override;
void Pause() override;
private:
WorkQueue m_WorkQueue{10000000, 1};
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout();
void FlushTimeoutWQ();
void Flush();
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);
OptionalTlsStream Connect();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);
}; };
} }

View File

@ -1,22 +1,16 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/configobject.hpp" #include "perfdata/influxdbcommonwriter.hpp"
library perfdata; library perfdata;
namespace icinga namespace icinga
{ {
class InfluxdbWriter : ConfigObject class InfluxdbWriter : InfluxdbCommonWriter
{ {
activation_priority 100; activation_priority 100;
[config, required] String host {
default {{{ return "127.0.0.1"; }}}
};
[config, required] String port {
default {{{ return "8086"; }}}
};
[config, required] String database { [config, required] String database {
default {{{ return "icinga2"; }}} default {{{ return "icinga2"; }}}
}; };
@ -26,72 +20,10 @@ class InfluxdbWriter : ConfigObject
[config, no_user_view] String password { [config, no_user_view] String password {
default {{{ return ""; }}} default {{{ return ""; }}}
}; };
[config] bool ssl_enable {
default {{{ return false; }}}
};
[config] String ssl_ca_cert {
default {{{ return ""; }}}
};
[config] String ssl_cert {
default {{{ return ""; }}}
};
[config] String ssl_key{
default {{{ return ""; }}}
};
[config, no_user_view] Dictionary::Ptr basic_auth; [config, no_user_view] Dictionary::Ptr basic_auth;
[config, required] Dictionary::Ptr host_template {
default {{{
return new Dictionary({
{ "measurement", "$host.check_command$" },
{ "tags", new Dictionary({
{ "hostname", "$host.name$" }
}) }
});
}}}
};
[config, required] Dictionary::Ptr service_template {
default {{{
return new Dictionary({
{ "measurement", "$service.check_command$" },
{ "tags", new Dictionary({
{ "hostname", "$host.name$" },
{ "service", "$service.name$" }
}) }
});
}}}
};
[config] bool enable_send_thresholds {
default {{{ return false; }}}
};
[config] bool enable_send_metadata {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 10; }}}
};
[config] int flush_threshold {
default {{{ return 1024; }}}
};
[config] bool enable_ha {
default {{{ return false; }}}
};
}; };
validator InfluxdbWriter { validator InfluxdbWriter {
Dictionary host_template {
required measurement;
String measurement;
Dictionary "tags" {
String "*";
};
};
Dictionary service_template {
required measurement;
String measurement;
Dictionary "tags" {
String "*";
};
};
Dictionary basic_auth { Dictionary basic_auth {
required username; required username;
String username; String username;

View File

@ -57,7 +57,7 @@ syn keyword icinga2ObjType Comment Dependency Downtime ElasticsearchWriter
syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener
syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup
syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection
syn keyword icinga2ObjType InfluxdbWriter LivestatusListener Notification NotificationCommand syn keyword icinga2ObjType InfluxdbWriter Influxdb2Writer LivestatusListener Notification NotificationCommand
syn keyword icinga2ObjType NotificationComponent OpenTsdbWriter PerfdataWriter syn keyword icinga2ObjType NotificationComponent OpenTsdbWriter PerfdataWriter
syn keyword icinga2ObjType ScheduledDowntime Service ServiceGroup SyslogLogger syn keyword icinga2ObjType ScheduledDowntime Service ServiceGroup SyslogLogger
syn keyword icinga2ObjType TimePeriod User UserGroup WindowsEventLogLogger Zone syn keyword icinga2ObjType TimePeriod User UserGroup WindowsEventLogLogger Zone