diff --git a/doc/06-distributed-monitoring.md b/doc/06-distributed-monitoring.md
index 1531451ba..ca47bccb5 100644
--- a/doc/06-distributed-monitoring.md
+++ b/doc/06-distributed-monitoring.md
@@ -3001,7 +3001,7 @@ By default, the following features provide advanced HA functionality:
* [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter)
* [Gelf](09-object-types.md#objecttype-gelfwriter)
* [Graphite](09-object-types.md#objecttype-graphitewriter)
-* [InfluxDB](09-object-types.md#objecttype-influxdbwriter)
+* [InfluxDB](09-object-types.md#objecttype-influxdb2writer) (v1 and v2)
* [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter)
* [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP)
diff --git a/doc/09-object-types.md b/doc/09-object-types.md
index a1ac37cea..19e796aff 100644
--- a/doc/09-object-types.md
+++ b/doc/09-object-types.md
@@ -1604,8 +1604,9 @@ Runtime Attributes:
### InfluxdbWriter
-Writes check result metrics and performance data to a defined InfluxDB host.
+Writes check result metrics and performance data to a defined InfluxDB v1 host.
This configuration object is available as [influxdb feature](14-features.md#influxdb-writer).
+For InfluxDB v2 support see the [Influxdb2Writer](#objecttype-influxdb2writer) below.
Example:
@@ -1669,6 +1670,68 @@ or similar.
+### Influxdb2Writer
+
+Writes check result metrics and performance data to a defined InfluxDB v2 host.
+This configuration object is available as [influxdb feature](14-features.md#influxdb-writer).
+For InfluxDB v1 support see the [InfluxdbWriter](#objecttype-influxdbwriter) above.
+
+Example:
+
+```
+object Influxdb2Writer "influxdb2" {
+ host = "127.0.0.1"
+ port = 8086
+ organization = "monitoring"
+ bucket = "icinga2"
+ auth_token = "ABCDEvwxyz0189-_"
+
+ flush_threshold = 1024
+ flush_interval = 10s
+
+ host_template = {
+ measurement = "$host.check_command$"
+ tags = {
+ hostname = "$host.name$"
+ }
+ }
+ service_template = {
+ measurement = "$service.check_command$"
+ tags = {
+ hostname = "$host.name$"
+ service = "$service.name$"
+ }
+ }
+}
+```
+
+Configuration Attributes:
+
+ Name | Type | Description
+ --------------------------|-----------------------|----------------------------------
+ host | String | **Required.** InfluxDB host address. Defaults to `127.0.0.1`.
+ port | Number | **Required.** InfluxDB HTTP port. Defaults to `8086`.
+ organization | String | **Required.** InfluxDB organization name.
+ bucket | String | **Required.** InfluxDB bucket name.
+ auth\_token | String | **Required.** InfluxDB authentication token.
+ ssl\_enable | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
+ ssl\_ca\_cert | String | **Optional.** Path to CA certificate to validate the remote host.
+ ssl\_cert | String | **Optional.** Path to host certificate to present to the remote host for mutual verification.
+ ssl\_key | String | **Optional.** Path to host key to accompany the ssl\_cert.
+ host\_template | Dictionary | **Required.** Host template to define the InfluxDB line protocol.
+ service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
+ enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
+ enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
+ flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
+ flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
+ enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
+
+Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
+to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
+or similar.
+
+
+
### LiveStatusListener
Livestatus API interface available as TCP or UNIX socket. Historical table queries
diff --git a/doc/13-addons.md b/doc/13-addons.md
index 4923f36f0..e212c3734 100644
--- a/doc/13-addons.md
+++ b/doc/13-addons.md
@@ -57,12 +57,19 @@ Integration in Icinga Web 2 is possible by installing the official [graphite mod
It’s written in Go and has no external dependencies.
Use the [InfluxdbWriter](14-features.md#influxdb-writer) feature
-for sending real-time metrics from Icinga 2 to InfluxDB.
+for sending real-time metrics from Icinga 2 to InfluxDB v1.
```bash
icinga2 feature enable influxdb
```
+Use the [Influxdb2Writer](14-features.md#influxdb-writer) feature
+for sending real-time metrics from Icinga 2 to InfluxDB v2.
+
+```bash
+icinga2 feature enable influxdb2
+```
+
A popular frontend for InfluxDB is for example [Grafana](https://grafana.org).
Integration in Icinga Web 2 is possible by installing the community [Grafana module](https://github.com/Mikesch-mp/icingaweb2-module-grafana).
diff --git a/doc/14-features.md b/doc/14-features.md
index 4a1809828..5e5eea0a7 100644
--- a/doc/14-features.md
+++ b/doc/14-features.md
@@ -369,7 +369,7 @@ where Carbon Cache/Relay is running as receiver.
### InfluxDB Writer
Once there are new metrics available, Icinga 2 will directly write them to the
-defined InfluxDB HTTP API.
+defined InfluxDB v1/v2 HTTP API.
You can enable the feature using
@@ -377,10 +377,17 @@ You can enable the feature using
icinga2 feature enable influxdb
```
-By default the [InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter) feature
-expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
+or
-Measurement names and tags are fully configurable by the end user. The InfluxdbWriter
+```bash
+icinga2 feature enable influxdb2
+```
+
+By default the
+[InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter)/[Influxdb2Writer](09-object-types.md#objecttype-influxdb2writer)
+features expect the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
+
+Measurement names and tags are fully configurable by the end user. The Influxdb(2)Writer
object will automatically add a `metric` tag to each data point. This correlates to the
perfdata label. Fields (value, warn, crit, min, max, unit) are created from data if available
and the configuration allows it. If a value associated with a tag is not able to be
@@ -391,12 +398,13 @@ escape characters when followed by a space or comma, but cannot be escaped thems
As a result all trailling slashes in these fields are replaced with an underscore. This
predominantly affects Windows paths e.g. `C:\` becomes `C:_`.
-The database is assumed to exist so this object will make no attempt to create it currently.
+The database/bucket is assumed to exist so this object will make no attempt to create it currently.
If [SELinux](22-selinux.md#selinux) is enabled, it will not allow access for Icinga 2 to InfluxDB until the [boolean](22-selinux.md#selinux-policy-booleans)
`icinga2_can_connect_all` is set to true as InfluxDB is not providing its own policy.
-More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter).
+More configuration details can be found [here for v1](09-object-types.md#objecttype-influxdbwriter)
+and [here for v2](09-object-types.md#objecttype-influxdb2writer).
#### Instance Tagging
diff --git a/etc/icinga2/features-available/influxdb.conf b/etc/icinga2/features-available/influxdb.conf
index af8423577..f0af37bdb 100644
--- a/etc/icinga2/features-available/influxdb.conf
+++ b/etc/icinga2/features-available/influxdb.conf
@@ -1,6 +1,6 @@
/**
* The InfluxdbWriter type writes check result metrics and
- * performance data to an InfluxDB HTTP API
+ * performance data to an InfluxDB v1 HTTP API
*/
object InfluxdbWriter "influxdb" {
diff --git a/etc/icinga2/features-available/influxdb2.conf b/etc/icinga2/features-available/influxdb2.conf
new file mode 100644
index 000000000..53f7a217b
--- /dev/null
+++ b/etc/icinga2/features-available/influxdb2.conf
@@ -0,0 +1,27 @@
+/**
+ * The Influxdb2Writer type writes check result metrics and
+ * performance data to an InfluxDB v2 HTTP API
+ */
+
+object Influxdb2Writer "influxdb2" {
+ //host = "127.0.0.1"
+ //port = 8086
+ //organization = "monitoring"
+ //bucket = "icinga2"
+ //auth_token = "ABCDEvwxyz0189-_"
+ //flush_threshold = 1024
+ //flush_interval = 10s
+ //host_template = {
+ // measurement = "$host.check_command$"
+ // tags = {
+ // hostname = "$host.name$"
+ // }
+ //}
+ //service_template = {
+ // measurement = "$service.check_command$"
+ // tags = {
+ // hostname = "$host.name$"
+ // service = "$service.name$"
+ // }
+ //}
+}
diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt
index c7fba5851..168938c42 100644
--- a/lib/perfdata/CMakeLists.txt
+++ b/lib/perfdata/CMakeLists.txt
@@ -2,7 +2,9 @@
mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp)
mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp)
+mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp)
mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp)
+mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp)
mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp)
mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp)
mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp)
@@ -11,7 +13,9 @@ set(perfdata_SOURCES
elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp
gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp
graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp
+ influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp
influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp
+ influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
)
@@ -44,6 +48,11 @@ install_if_not_exists(
${ICINGA2_CONFIGDIR}/features-available
)
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb2.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf
${ICINGA2_CONFIGDIR}/features-available
diff --git a/lib/perfdata/influxdb2writer.cpp b/lib/perfdata/influxdb2writer.cpp
new file mode 100644
index 000000000..57fc94e23
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.cpp
@@ -0,0 +1,44 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdb2writer.hpp"
+#include "perfdata/influxdb2writer-ti.cpp"
+#include "remote/url.hpp"
+#include "base/configtype.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/statsfunction.hpp"
+#include
+#include
+#include
+
+using namespace icinga;
+
+REGISTER_TYPE(Influxdb2Writer);
+
+REGISTER_STATSFUNCTION(Influxdb2Writer, &Influxdb2Writer::StatsFunc);
+
+void Influxdb2Writer::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ InfluxdbCommonWriter::StatsFunc(status, perfdata);
+}
+
+boost::beast::http::request Influxdb2Writer::AssembleRequest(String body)
+{
+ auto request (AssembleBaseRequest(std::move(body)));
+
+ request.set(boost::beast::http::field::authorization, "Token " + GetAuthToken());
+
+ return std::move(request);
+}
+
+Url::Ptr Influxdb2Writer::AssembleUrl()
+{
+ auto url (AssembleBaseUrl());
+
+ std::vector path ({"api", "v2", "write"});
+ url->SetPath(path);
+
+ url->AddQueryElement("org", GetOrganization());
+ url->AddQueryElement("bucket", GetBucket());
+
+ return std::move(url);
+}
diff --git a/lib/perfdata/influxdb2writer.hpp b/lib/perfdata/influxdb2writer.hpp
new file mode 100644
index 000000000..3b20f8b9d
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.hpp
@@ -0,0 +1,33 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#ifndef INFLUXDB2WRITER_H
+#define INFLUXDB2WRITER_H
+
+#include "perfdata/influxdb2writer-ti.hpp"
+#include
+#include
+
+namespace icinga
+{
+
+/**
+ * An Icinga InfluxDB v2 writer.
+ *
+ * @ingroup perfdata
+ */
+class Influxdb2Writer final : public ObjectImpl
+{
+public:
+ DECLARE_OBJECT(Influxdb2Writer);
+ DECLARE_OBJECTNAME(Influxdb2Writer);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+protected:
+ boost::beast::http::request AssembleRequest(String body) override;
+ Url::Ptr AssembleUrl() override;
+};
+
+}
+
+#endif /* INFLUXDB2WRITER_H */
diff --git a/lib/perfdata/influxdb2writer.ti b/lib/perfdata/influxdb2writer.ti
new file mode 100644
index 000000000..f80618755
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.ti
@@ -0,0 +1,19 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbcommonwriter.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class Influxdb2Writer : InfluxdbCommonWriter
+{
+ activation_priority 100;
+
+ [config, required] String organization;
+ [config, required] String bucket;
+ [config, required, no_user_view] String auth_token;
+};
+
+}
diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp
new file mode 100644
index 000000000..d85426f44
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.cpp
@@ -0,0 +1,586 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbcommonwriter.hpp"
+#include "perfdata/influxdbcommonwriter-ti.cpp"
+#include "remote/url.hpp"
+#include "icinga/service.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/icingaapplication.hpp"
+#include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
+#include "base/defer.hpp"
+#include "base/io-engine.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/stream.hpp"
+#include "base/json.hpp"
+#include "base/networkstream.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include "base/tlsutility.hpp"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+using namespace icinga;
+
+REGISTER_TYPE(InfluxdbCommonWriter);
+
+class InfluxdbInteger final : public Object
+{
+public:
+ DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
+
+ InfluxdbInteger(int value)
+ : m_Value(value)
+ { }
+
+ int GetValue() const
+ {
+ return m_Value;
+ }
+
+private:
+ int m_Value;
+};
+
+void InfluxdbCommonWriter::OnConfigLoaded()
+{
+ ObjectImpl::OnConfigLoaded();
+
+ m_WorkQueue.SetName(GetReflectionType()->GetName() + ", " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void InfluxdbCommonWriter::Resume()
+{
+ ObjectImpl::Resume();
+
+ Log(LogInformation, GetReflectionType()->GetName())
+ << "'" << GetName() << "' resumed.";
+
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Setup timer for periodically flushing m_DataBuffer */
+ m_FlushTimer = new Timer();
+ m_FlushTimer->SetInterval(GetFlushInterval());
+ m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
+ m_FlushTimer->Start();
+ m_FlushTimer->Reschedule(0);
+
+ /* Register for new metrics. */
+ Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void InfluxdbCommonWriter::Pause()
+{
+ /* Force a flush. */
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Flushing pending data buffers.";
+
+ Flush();
+
+ /* Work on the missing tasks. TODO: Find a way to cache them on disk. */
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Joining existing WQ tasks.";
+
+ m_WorkQueue.Join();
+
+ /* Flush again after the WQ tasks have filled the data buffer. */
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Flushing data buffers from WQ tasks.";
+
+ Flush();
+
+ Log(LogInformation, GetReflectionType()->GetName())
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl::Pause();
+}
+
+void InfluxdbCommonWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, GetReflectionType()->GetName(), "Exception during InfluxDB operation: Verify that your backend is operational!");
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
+
+ //TODO: Close the connection, if we keep it open.
+}
+
+OptionalTlsStream InfluxdbCommonWriter::Connect()
+{
+ Log(LogNotice, GetReflectionType()->GetName())
+ << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ OptionalTlsStream stream;
+ bool ssl = GetSslEnable();
+
+ if (ssl) {
+ Shared::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ stream.second = Shared::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ if (ssl) {
+ auto& tlsStream (stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(tlsStream.client);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "TLS handshake with host '" << GetHost() << "' failed.";
+ throw;
+ }
+ }
+
+ return std::move(stream);
+}
+
+void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
+}
+
+void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Processing check result for '" + checkable->GetName() + "'");
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ MacroProcessor::ResolverList resolvers;
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+ resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
+
+ String prefix;
+
+ double ts = cr->GetExecutionEnd();
+
+ // Clone the template and perform an in-place macro expansion of measurement and tag values
+ Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
+ Dictionary::Ptr tmpl = static_pointer_cast(tmpl_clean->ShallowClone());
+ tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
+
+ Dictionary::Ptr tagsClean = tmpl->Get("tags");
+ if (tagsClean) {
+ Dictionary::Ptr tags = new Dictionary();
+
+ {
+ ObjectLock olock(tagsClean);
+ for (const Dictionary::Pair& pair : tagsClean) {
+ String missing_macro;
+ Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
+
+ if (missing_macro.IsEmpty()) {
+ tags->Set(pair.first, value);
+ }
+ }
+ }
+
+ tmpl->Set("tags", tags);
+ }
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (perfdata) {
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ Dictionary::Ptr fields = new Dictionary();
+ fields->Set("value", pdv->GetValue());
+
+ if (GetEnableSendThresholds()) {
+ if (!pdv->GetCrit().IsEmpty())
+ fields->Set("crit", pdv->GetCrit());
+ if (!pdv->GetWarn().IsEmpty())
+ fields->Set("warn", pdv->GetWarn());
+ if (!pdv->GetMin().IsEmpty())
+ fields->Set("min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty())
+ fields->Set("max", pdv->GetMax());
+ }
+ if (!pdv->GetUnit().IsEmpty()) {
+ fields->Set("unit", pdv->GetUnit());
+ }
+
+ SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
+ }
+ }
+
+ if (GetEnableSendMetadata()) {
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service)
+ fields->Set("state", new InfluxdbInteger(service->GetState()));
+ else
+ fields->Set("state", new InfluxdbInteger(host->GetState()));
+
+ fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
+ fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
+ fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
+ fields->Set("reachable", checkable->IsReachable());
+ fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
+ fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
+ fields->Set("latency", cr->CalculateLatency());
+ fields->Set("execution_time", cr->CalculateExecutionTime());
+
+ SendMetric(checkable, tmpl, Empty, fields, ts);
+ }
+}
+
+String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str)
+{
+ // Iterate over the key name and escape commas and spaces with a backslash
+ String result = str;
+ boost::algorithm::replace_all(result, "\"", "\\\"");
+ boost::algorithm::replace_all(result, "=", "\\=");
+ boost::algorithm::replace_all(result, ",", "\\,");
+ boost::algorithm::replace_all(result, " ", "\\ ");
+
+ // InfluxDB 'feature': although backslashes are allowed in keys they also act
+ // as escape sequences when followed by ',' or ' '. When your tag is like
+ // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
+ // and through experimentation they also escape '='. To be safe we replace
+ // trailing backslashes with and underscore.
+ // See https://github.com/influxdata/influxdb/issues/8587 for more info
+ size_t length = result.GetLength();
+ if (result[length - 1] == '\\')
+ result[length - 1] = '_';
+
+ return result;
+}
+
+String InfluxdbCommonWriter::EscapeValue(const Value& value)
+{
+ if (value.IsObjectType()) {
+ std::ostringstream os;
+ os << static_cast(value)->GetValue() << "i";
+ return os.str();
+ }
+
+ if (value.IsBoolean())
+ return value ? "true" : "false";
+
+ if (value.IsString())
+ return "\"" + EscapeKeyOrTagValue(value) + "\"";
+
+ return value;
+}
+
+void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
+ const String& label, const Dictionary::Ptr& fields, double ts)
+{
+ std::ostringstream msgbuf;
+ msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
+
+ Dictionary::Ptr tags = tmpl->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ // Empty macro expansion, no tag
+ if (!pair.second.IsEmpty()) {
+ msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
+ }
+ }
+ }
+
+ // Label may be empty in the case of metadata
+ if (!label.IsEmpty())
+ msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
+
+ msgbuf << " ";
+
+ {
+ bool first = true;
+
+ ObjectLock fieldLock(fields);
+ for (const Dictionary::Pair& pair : fields) {
+ if (first)
+ first = false;
+ else
+ msgbuf << ",";
+
+ msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
+ }
+ }
+
+ msgbuf << " " << static_cast(ts);
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'.";
+
+ // Buffer the data point
+ m_DataBuffer.emplace_back(msgbuf.str());
+
+ // Flush if we've buffered too much to prevent excessive memory use
+ if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) {
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
+
+ try {
+ Flush();
+ } catch (...) {
+ /* Do nothing. */
+ }
+ }
+}
+
+void InfluxdbCommonWriter::FlushTimeout()
+{
+ m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
+}
+
+void InfluxdbCommonWriter::FlushTimeoutWQ()
+{
+ AssertOnWorkQueue();
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Timer expired writing " << m_DataBuffer.size() << " data points";
+
+ Flush();
+}
+
+void InfluxdbCommonWriter::Flush()
+{
+ namespace beast = boost::beast;
+ namespace http = beast::http;
+
+ /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
+ if (m_DataBuffer.empty())
+ return;
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Flushing data buffer to InfluxDB.";
+
+ String body = boost::algorithm::join(m_DataBuffer, "\n");
+ m_DataBuffer.clear();
+
+ OptionalTlsStream stream;
+
+ try {
+ stream = Connect();
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "InfluxDbWriter")
+ << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
+ return;
+ }
+
+ Defer s ([&stream]() {
+ if (stream.first) {
+ stream.first->next_layer().shutdown();
+ }
+ });
+
+ auto request (AssembleRequest(std::move(body)));
+
+ try {
+ if (stream.first) {
+ http::write(*stream.first, request);
+ stream.first->flush();
+ } else {
+ http::write(*stream.second, request);
+ stream.second->flush();
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ http::parser parser;
+ beast::flat_buffer buf;
+
+ try {
+ if (stream.first) {
+ http::read(*stream.first, buf, parser);
+ } else {
+ http::read(*stream.second, buf, parser);
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
+ throw;
+ }
+
+ auto& response (parser.get());
+
+ if (response.result() != http::status::no_content) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unexpected response code: " << response.result();
+
+ auto& contentType (response[http::field::content_type]);
+ if (contentType != "application/json") {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unexpected Content-Type: " << contentType;
+ return;
+ }
+
+ Dictionary::Ptr jsonResponse;
+ auto& body (response.body());
+
+ try {
+ jsonResponse = JsonDecode(body);
+ } catch (...) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unable to parse JSON response:\n" << body;
+ return;
+ }
+
+ String error = jsonResponse->Get("error");
+
+ Log(LogCritical, GetReflectionType()->GetName())
+ << "InfluxDB error message:\n" << error;
+ }
+}
+
+boost::beast::http::request InfluxdbCommonWriter::AssembleBaseRequest(String body)
+{
+ namespace http = boost::beast::http;
+
+ auto url (AssembleUrl());
+ http::request request (http::verb::post, std::string(url->Format(true)), 10);
+
+ request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+ request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+ request.body() = std::move(body);
+ request.content_length(request.body().size());
+
+ return std::move(request);
+}
+
+Url::Ptr InfluxdbCommonWriter::AssembleBaseUrl()
+{
+ Url::Ptr url = new Url();
+
+ url->SetScheme(GetSslEnable() ? "https" : "http");
+ url->SetHost(GetHost());
+ url->SetPort(GetPort());
+ url->AddQueryElement("precision", "s");
+
+ return std::move(url);
+}
+
+void InfluxdbCommonWriter::ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateHostTemplate(lvalue, utils);
+
+ String measurement = lvalue()->Get("measurement");
+ if (!MacroProcessor::ValidateMacroString(measurement))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
+
+void InfluxdbCommonWriter::ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateServiceTemplate(lvalue, utils);
+
+ String measurement = lvalue()->Get("measurement");
+ if (!MacroProcessor::ValidateMacroString(measurement))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
+
diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp
new file mode 100644
index 000000000..06e860841
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.hpp
@@ -0,0 +1,99 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#ifndef INFLUXDBCOMMONWRITER_H
+#define INFLUXDBCOMMONWRITER_H
+
+#include "perfdata/influxdbcommonwriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/timer.hpp"
+#include "base/tlsstream.hpp"
+#include "base/workqueue.hpp"
+#include "remote/url.hpp"
+#include
+#include
+#include
+
+namespace icinga
+{
+
+/**
+ * Common base class for InfluxDB v1/v2 writers.
+ *
+ * @ingroup perfdata
+ */
+class InfluxdbCommonWriter : public ObjectImpl
+{
+public:
+ DECLARE_OBJECT(InfluxdbCommonWriter);
+
+ template
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ void ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) override;
+ void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override;
+
+protected:
+ WorkQueue m_WorkQueue{10000000, 1};
+ std::vector m_DataBuffer;
+
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+ boost::beast::http::request AssembleBaseRequest(String body);
+ Url::Ptr AssembleBaseUrl();
+ virtual boost::beast::http::request AssembleRequest(String body) = 0;
+ virtual Url::Ptr AssembleUrl() = 0;
+
+private:
+ Timer::Ptr m_FlushTimer;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
+ const String& label, const Dictionary::Ptr& fields, double ts);
+ void FlushTimeout();
+ void FlushTimeoutWQ();
+ void Flush();
+
+ static String EscapeKeyOrTagValue(const String& str);
+ static String EscapeValue(const Value& value);
+
+ OptionalTlsStream Connect();
+
+ void AssertOnWorkQueue();
+
+ void ExceptionHandler(boost::exception_ptr exp);
+};
+
+template
+void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+ auto typeName (InfluxWriter::TypeInstance->GetName().ToLower());
+
+ for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType()) {
+ size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+ size_t dataBufferItems = influxwriter->m_DataBuffer.size();
+
+ nodes.emplace_back(influxwriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate },
+ { "data_buffer_items", dataBufferItems }
+ }));
+
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems));
+ }
+
+ status->Set(typeName, new Dictionary(std::move(nodes)));
+}
+
+}
+
+#endif /* INFLUXDBCOMMONWRITER_H */
diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti
new file mode 100644
index 000000000..7eb26dac9
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.ti
@@ -0,0 +1,85 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+abstract class InfluxdbCommonWriter : ConfigObject
+{
+ [config, required] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config, required] String port {
+ default {{{ return "8086"; }}}
+ };
+ [config] bool ssl_enable {
+ default {{{ return false; }}}
+ };
+ [config] String ssl_ca_cert {
+ default {{{ return ""; }}}
+ };
+ [config] String ssl_cert {
+ default {{{ return ""; }}}
+ };
+ [config] String ssl_key{
+ default {{{ return ""; }}}
+ };
+ [config, required] Dictionary::Ptr host_template {
+ default {{{
+ return new Dictionary({
+ { "measurement", "$host.check_command$" },
+ { "tags", new Dictionary({
+ { "hostname", "$host.name$" }
+ }) }
+ });
+ }}}
+ };
+ [config, required] Dictionary::Ptr service_template {
+ default {{{
+ return new Dictionary({
+ { "measurement", "$service.check_command$" },
+ { "tags", new Dictionary({
+ { "hostname", "$host.name$" },
+ { "service", "$service.name$" }
+ }) }
+ });
+ }}}
+ };
+ [config] bool enable_send_thresholds {
+ default {{{ return false; }}}
+ };
+ [config] bool enable_send_metadata {
+ default {{{ return false; }}}
+ };
+ [config] int flush_interval {
+ default {{{ return 10; }}}
+ };
+ [config] int flush_threshold {
+ default {{{ return 1024; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+};
+
+validator InfluxdbCommonWriter {
+ Dictionary host_template {
+ required measurement;
+ String measurement;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+ Dictionary service_template {
+ required measurement;
+ String measurement;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+};
+
+}
diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp
index cde7a3653..30240f7f4 100644
--- a/lib/perfdata/influxdbwriter.cpp
+++ b/lib/perfdata/influxdbwriter.cpp
@@ -2,618 +2,55 @@
#include "perfdata/influxdbwriter.hpp"
#include "perfdata/influxdbwriter-ti.cpp"
-#include "remote/url.hpp"
-#include "icinga/service.hpp"
-#include "icinga/macroprocessor.hpp"
-#include "icinga/icingaapplication.hpp"
-#include "icinga/checkcommand.hpp"
-#include "base/application.hpp"
-#include "base/defer.hpp"
-#include "base/io-engine.hpp"
-#include "base/tcpsocket.hpp"
-#include "base/configtype.hpp"
-#include "base/objectlock.hpp"
-#include "base/logger.hpp"
-#include "base/convert.hpp"
-#include "base/utility.hpp"
-#include "base/perfdatavalue.hpp"
-#include "base/stream.hpp"
-#include "base/json.hpp"
#include "base/base64.hpp"
-#include "base/networkstream.hpp"
-#include "base/exception.hpp"
+#include "remote/url.hpp"
+#include "base/configtype.hpp"
+#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
-#include "base/tlsutility.hpp"
-#include
-#include
-#include
-#include
-#include
#include
-#include
-#include
-#include
#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
#include
using namespace icinga;
-class InfluxdbInteger final : public Object
-{
-public:
- DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
-
- InfluxdbInteger(int value)
- : m_Value(value)
- { }
-
- int GetValue() const
- {
- return m_Value;
- }
-
-private:
- int m_Value;
-};
-
REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
-void InfluxdbWriter::OnConfigLoaded()
-{
- ObjectImpl::OnConfigLoaded();
-
- m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
-
- if (!GetEnableHa()) {
- Log(LogDebug, "InfluxdbWriter")
- << "HA functionality disabled. Won't pause connection: " << GetName();
-
- SetHAMode(HARunEverywhere);
- } else {
- SetHAMode(HARunOnce);
- }
-}
-
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
- DictionaryData nodes;
+ InfluxdbCommonWriter::StatsFunc(status, perfdata);
+}
- for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) {
- size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
- double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
- size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
+boost::beast::http::request InfluxdbWriter::AssembleRequest(String body)
+{
+ auto request (AssembleBaseRequest(std::move(body)));
+ Dictionary::Ptr basicAuth = GetBasicAuth();
- nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({
- { "work_queue_items", workQueueItems },
- { "work_queue_item_rate", workQueueItemRate },
- { "data_buffer_items", dataBufferItems }
- }));
-
- perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems));
- perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
- perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems));
+ if (basicAuth) {
+ request.set(
+ boost::beast::http::field::authorization,
+ "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password"))
+ );
}
- status->Set("influxdbwriter", new Dictionary(std::move(nodes)));
+ return std::move(request);
}
-void InfluxdbWriter::Resume()
+Url::Ptr InfluxdbWriter::AssembleUrl()
{
- ObjectImpl::Resume();
-
- Log(LogInformation, "InfluxdbWriter")
- << "'" << GetName() << "' resumed.";
-
- /* Register exception handler for WQ tasks. */
- m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
-
- /* Setup timer for periodically flushing m_DataBuffer */
- m_FlushTimer = new Timer();
- m_FlushTimer->SetInterval(GetFlushInterval());
- m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
- m_FlushTimer->Start();
- m_FlushTimer->Reschedule(0);
-
- /* Register for new metrics. */
- Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
- CheckResultHandler(checkable, cr);
- });
-}
-
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
-void InfluxdbWriter::Pause()
-{
- /* Force a flush. */
- Log(LogDebug, "InfluxdbWriter")
- << "Flushing pending data buffers.";
-
- Flush();
-
- /* Work on the missing tasks. TODO: Find a way to cache them on disk. */
- Log(LogDebug, "InfluxdbWriter")
- << "Joining existing WQ tasks.";
-
- m_WorkQueue.Join();
-
- /* Flush again after the WQ tasks have filled the data buffer. */
- Log(LogDebug, "InfluxdbWriter")
- << "Flushing data buffers from WQ tasks.";
-
- Flush();
-
- Log(LogInformation, "InfluxdbWriter")
- << "'" << GetName() << "' paused.";
-
- ObjectImpl::Pause();
-}
-
-void InfluxdbWriter::AssertOnWorkQueue()
-{
- ASSERT(m_WorkQueue.IsWorkerThread());
-}
-
-void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
-{
- Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
-
- Log(LogDebug, "InfluxdbWriter")
- << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
-
- //TODO: Close the connection, if we keep it open.
-}
-
-OptionalTlsStream InfluxdbWriter::Connect()
-{
- Log(LogNotice, "InfluxdbWriter")
- << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
-
- OptionalTlsStream stream;
- bool ssl = GetSslEnable();
-
- if (ssl) {
- Shared::Ptr sslContext;
-
- try {
- sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "Unable to create SSL context.";
- throw;
- }
-
- stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
-
- } else {
- stream.second = Shared::Make(IoEngine::Get().GetIoContext());
- }
-
- try {
- icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
- throw;
- }
-
- if (ssl) {
- auto& tlsStream (stream.first->next_layer());
-
- try {
- tlsStream.handshake(tlsStream.client);
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "TLS handshake with host '" << GetHost() << "' failed.";
- throw;
- }
- }
-
- return std::move(stream);
-}
-
-void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
-{
- if (IsPaused())
- return;
-
- m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
-}
-
-void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
-{
- AssertOnWorkQueue();
-
- CONTEXT("Processing check result for '" + checkable->GetName() + "'");
-
- if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
- return;
-
- Host::Ptr host;
- Service::Ptr service;
- tie(host, service) = GetHostService(checkable);
-
- MacroProcessor::ResolverList resolvers;
- if (service)
- resolvers.emplace_back("service", service);
- resolvers.emplace_back("host", host);
- resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
-
- String prefix;
-
- double ts = cr->GetExecutionEnd();
-
- // Clone the template and perform an in-place macro expansion of measurement and tag values
- Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
- Dictionary::Ptr tmpl = static_pointer_cast(tmpl_clean->ShallowClone());
- tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
-
- Dictionary::Ptr tagsClean = tmpl->Get("tags");
- if (tagsClean) {
- Dictionary::Ptr tags = new Dictionary();
-
- {
- ObjectLock olock(tagsClean);
- for (const Dictionary::Pair& pair : tagsClean) {
- String missing_macro;
- Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
-
- if (missing_macro.IsEmpty()) {
- tags->Set(pair.first, value);
- }
- }
- }
-
- tmpl->Set("tags", tags);
- }
-
- CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
-
- Array::Ptr perfdata = cr->GetPerformanceData();
-
- if (perfdata) {
- ObjectLock olock(perfdata);
- for (const Value& val : perfdata) {
- PerfdataValue::Ptr pdv;
-
- if (val.IsObjectType())
- pdv = val;
- else {
- try {
- pdv = PerfdataValue::Parse(val);
- } catch (const std::exception&) {
- Log(LogWarning, "InfluxdbWriter")
- << "Ignoring invalid perfdata for checkable '"
- << checkable->GetName() << "' and command '"
- << checkCommand->GetName() << "' with value: " << val;
- continue;
- }
- }
-
- Dictionary::Ptr fields = new Dictionary();
- fields->Set("value", pdv->GetValue());
-
- if (GetEnableSendThresholds()) {
- if (!pdv->GetCrit().IsEmpty())
- fields->Set("crit", pdv->GetCrit());
- if (!pdv->GetWarn().IsEmpty())
- fields->Set("warn", pdv->GetWarn());
- if (!pdv->GetMin().IsEmpty())
- fields->Set("min", pdv->GetMin());
- if (!pdv->GetMax().IsEmpty())
- fields->Set("max", pdv->GetMax());
- }
- if (!pdv->GetUnit().IsEmpty()) {
- fields->Set("unit", pdv->GetUnit());
- }
-
- SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
- }
- }
-
- if (GetEnableSendMetadata()) {
- Host::Ptr host;
- Service::Ptr service;
- tie(host, service) = GetHostService(checkable);
-
- Dictionary::Ptr fields = new Dictionary();
-
- if (service)
- fields->Set("state", new InfluxdbInteger(service->GetState()));
- else
- fields->Set("state", new InfluxdbInteger(host->GetState()));
-
- fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
- fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
- fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
- fields->Set("reachable", checkable->IsReachable());
- fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
- fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
- fields->Set("latency", cr->CalculateLatency());
- fields->Set("execution_time", cr->CalculateExecutionTime());
-
- SendMetric(checkable, tmpl, Empty, fields, ts);
- }
-}
-
-String InfluxdbWriter::EscapeKeyOrTagValue(const String& str)
-{
- // Iterate over the key name and escape commas and spaces with a backslash
- String result = str;
- boost::algorithm::replace_all(result, "\"", "\\\"");
- boost::algorithm::replace_all(result, "=", "\\=");
- boost::algorithm::replace_all(result, ",", "\\,");
- boost::algorithm::replace_all(result, " ", "\\ ");
-
- // InfluxDB 'feature': although backslashes are allowed in keys they also act
- // as escape sequences when followed by ',' or ' '. When your tag is like
- // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
- // and through experimentation they also escape '='. To be safe we replace
- // trailing backslashes with and underscore.
- // See https://github.com/influxdata/influxdb/issues/8587 for more info
- size_t length = result.GetLength();
- if (result[length - 1] == '\\')
- result[length - 1] = '_';
-
- return result;
-}
-
-String InfluxdbWriter::EscapeValue(const Value& value)
-{
- if (value.IsObjectType()) {
- std::ostringstream os;
- os << static_cast(value)->GetValue() << "i";
- return os.str();
- }
-
- if (value.IsBoolean())
- return value ? "true" : "false";
-
- if (value.IsString())
- return "\"" + EscapeKeyOrTagValue(value) + "\"";
-
- return value;
-}
-
-void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
- const String& label, const Dictionary::Ptr& fields, double ts)
-{
- std::ostringstream msgbuf;
- msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
-
- Dictionary::Ptr tags = tmpl->Get("tags");
- if (tags) {
- ObjectLock olock(tags);
- for (const Dictionary::Pair& pair : tags) {
- // Empty macro expansion, no tag
- if (!pair.second.IsEmpty()) {
- msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
- }
- }
- }
-
- // Label may be empty in the case of metadata
- if (!label.IsEmpty())
- msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
-
- msgbuf << " ";
-
- {
- bool first = true;
-
- ObjectLock fieldLock(fields);
- for (const Dictionary::Pair& pair : fields) {
- if (first)
- first = false;
- else
- msgbuf << ",";
-
- msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
- }
- }
-
- msgbuf << " " << static_cast(ts);
-
- Log(LogDebug, "InfluxdbWriter")
- << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'.";
-
- // Buffer the data point
- m_DataBuffer.emplace_back(msgbuf.str());
-
- // Flush if we've buffered too much to prevent excessive memory use
- if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) {
- Log(LogDebug, "InfluxdbWriter")
- << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
-
- try {
- Flush();
- } catch (...) {
- /* Do nothing. */
- }
- }
-}
-
-void InfluxdbWriter::FlushTimeout()
-{
- m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
-}
-
-void InfluxdbWriter::FlushTimeoutWQ()
-{
- AssertOnWorkQueue();
-
- Log(LogDebug, "InfluxdbWriter")
- << "Timer expired writing " << m_DataBuffer.size() << " data points";
-
- Flush();
-}
-
-void InfluxdbWriter::Flush()
-{
- namespace beast = boost::beast;
- namespace http = beast::http;
-
- /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
- if (m_DataBuffer.empty())
- return;
-
- Log(LogDebug, "InfluxdbWriter")
- << "Flushing data buffer to InfluxDB.";
-
- String body = boost::algorithm::join(m_DataBuffer, "\n");
- m_DataBuffer.clear();
-
- OptionalTlsStream stream;
-
- try {
- stream = Connect();
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxDbWriter")
- << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
- return;
- }
-
- Defer s ([&stream]() {
- if (stream.first) {
- stream.first->next_layer().shutdown();
- }
- });
-
- Url::Ptr url = new Url();
- url->SetScheme(GetSslEnable() ? "https" : "http");
- url->SetHost(GetHost());
- url->SetPort(GetPort());
+ auto url (AssembleBaseUrl());
std::vector path;
path.emplace_back("write");
url->SetPath(path);
url->AddQueryElement("db", GetDatabase());
- url->AddQueryElement("precision", "s");
+
if (!GetUsername().IsEmpty())
url->AddQueryElement("u", GetUsername());
if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword());
- http::request request (http::verb::post, std::string(url->Format(true)), 10);
-
- request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
- request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
-
- {
- Dictionary::Ptr basicAuth = GetBasicAuth();
-
- if (basicAuth) {
- request.set(
- http::field::authorization,
- "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password"))
- );
- }
- }
-
- request.body() = body;
- request.content_length(request.body().size());
-
- try {
- if (stream.first) {
- http::write(*stream.first, request);
- stream.first->flush();
- } else {
- http::write(*stream.second, request);
- stream.second->flush();
- }
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
- throw;
- }
-
- http::parser parser;
- beast::flat_buffer buf;
-
- try {
- if (stream.first) {
- http::read(*stream.first, buf, parser);
- } else {
- http::read(*stream.second, buf, parser);
- }
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
- throw;
- }
-
- auto& response (parser.get());
-
- if (response.result() != http::status::no_content) {
- Log(LogWarning, "InfluxdbWriter")
- << "Unexpected response code: " << response.result();
-
- auto& contentType (response[http::field::content_type]);
- if (contentType != "application/json") {
- Log(LogWarning, "InfluxdbWriter")
- << "Unexpected Content-Type: " << contentType;
- return;
- }
-
- Dictionary::Ptr jsonResponse;
- auto& body (response.body());
-
- try {
- jsonResponse = JsonDecode(body);
- } catch (...) {
- Log(LogWarning, "InfluxdbWriter")
- << "Unable to parse JSON response:\n" << body;
- return;
- }
-
- String error = jsonResponse->Get("error");
-
- Log(LogCritical, "InfluxdbWriter")
- << "InfluxDB error message:\n" << error;
- }
+ return std::move(url);
}
-
-void InfluxdbWriter::ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils)
-{
- ObjectImpl::ValidateHostTemplate(lvalue, utils);
-
- String measurement = lvalue()->Get("measurement");
- if (!MacroProcessor::ValidateMacroString(measurement))
- BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
-
- Dictionary::Ptr tags = lvalue()->Get("tags");
- if (tags) {
- ObjectLock olock(tags);
- for (const Dictionary::Pair& pair : tags) {
- if (!MacroProcessor::ValidateMacroString(pair.second))
- BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
- }
- }
-}
-
-void InfluxdbWriter::ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils)
-{
- ObjectImpl::ValidateServiceTemplate(lvalue, utils);
-
- String measurement = lvalue()->Get("measurement");
- if (!MacroProcessor::ValidateMacroString(measurement))
- BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
-
- Dictionary::Ptr tags = lvalue()->Get("tags");
- if (tags) {
- ObjectLock olock(tags);
- for (const Dictionary::Pair& pair : tags) {
- if (!MacroProcessor::ValidateMacroString(pair.second))
- BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
- }
- }
-}
-
diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp
index 1f7ab8309..48676cc97 100644
--- a/lib/perfdata/influxdbwriter.hpp
+++ b/lib/perfdata/influxdbwriter.hpp
@@ -4,19 +4,12 @@
#define INFLUXDBWRITER_H
#include "perfdata/influxdbwriter-ti.hpp"
-#include "icinga/service.hpp"
-#include "base/configobject.hpp"
-#include "base/tcpsocket.hpp"
-#include "base/timer.hpp"
-#include "base/tlsstream.hpp"
-#include "base/workqueue.hpp"
-#include
namespace icinga
{
/**
- * An Icinga InfluxDB writer.
+ * An Icinga InfluxDB v1 writer.
*
* @ingroup perfdata
*/
@@ -28,35 +21,9 @@ public:
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
- void ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) override;
- void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override;
-
protected:
- void OnConfigLoaded() override;
- void Resume() override;
- void Pause() override;
-
-private:
- WorkQueue m_WorkQueue{10000000, 1};
- Timer::Ptr m_FlushTimer;
- std::vector m_DataBuffer;
-
- void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
- void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
- void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
- const String& label, const Dictionary::Ptr& fields, double ts);
- void FlushTimeout();
- void FlushTimeoutWQ();
- void Flush();
-
- static String EscapeKeyOrTagValue(const String& str);
- static String EscapeValue(const Value& value);
-
- OptionalTlsStream Connect();
-
- void AssertOnWorkQueue();
-
- void ExceptionHandler(boost::exception_ptr exp);
+ boost::beast::http::request AssembleRequest(String body) override;
+ Url::Ptr AssembleUrl() override;
};
}
diff --git a/lib/perfdata/influxdbwriter.ti b/lib/perfdata/influxdbwriter.ti
index 73a340cc3..e6fc84e5c 100644
--- a/lib/perfdata/influxdbwriter.ti
+++ b/lib/perfdata/influxdbwriter.ti
@@ -1,22 +1,16 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
-#include "base/configobject.hpp"
+#include "perfdata/influxdbcommonwriter.hpp"
library perfdata;
namespace icinga
{
-class InfluxdbWriter : ConfigObject
+class InfluxdbWriter : InfluxdbCommonWriter
{
activation_priority 100;
- [config, required] String host {
- default {{{ return "127.0.0.1"; }}}
- };
- [config, required] String port {
- default {{{ return "8086"; }}}
- };
[config, required] String database {
default {{{ return "icinga2"; }}}
};
@@ -26,72 +20,10 @@ class InfluxdbWriter : ConfigObject
[config, no_user_view] String password {
default {{{ return ""; }}}
};
- [config] bool ssl_enable {
- default {{{ return false; }}}
- };
- [config] String ssl_ca_cert {
- default {{{ return ""; }}}
- };
- [config] String ssl_cert {
- default {{{ return ""; }}}
- };
- [config] String ssl_key{
- default {{{ return ""; }}}
- };
[config, no_user_view] Dictionary::Ptr basic_auth;
- [config, required] Dictionary::Ptr host_template {
- default {{{
- return new Dictionary({
- { "measurement", "$host.check_command$" },
- { "tags", new Dictionary({
- { "hostname", "$host.name$" }
- }) }
- });
- }}}
- };
- [config, required] Dictionary::Ptr service_template {
- default {{{
- return new Dictionary({
- { "measurement", "$service.check_command$" },
- { "tags", new Dictionary({
- { "hostname", "$host.name$" },
- { "service", "$service.name$" }
- }) }
- });
- }}}
- };
- [config] bool enable_send_thresholds {
- default {{{ return false; }}}
- };
- [config] bool enable_send_metadata {
- default {{{ return false; }}}
- };
- [config] int flush_interval {
- default {{{ return 10; }}}
- };
- [config] int flush_threshold {
- default {{{ return 1024; }}}
- };
- [config] bool enable_ha {
- default {{{ return false; }}}
- };
};
validator InfluxdbWriter {
- Dictionary host_template {
- required measurement;
- String measurement;
- Dictionary "tags" {
- String "*";
- };
- };
- Dictionary service_template {
- required measurement;
- String measurement;
- Dictionary "tags" {
- String "*";
- };
- };
Dictionary basic_auth {
required username;
String username;
diff --git a/tools/syntax/vim/syntax/icinga2.vim b/tools/syntax/vim/syntax/icinga2.vim
index 59ff202f1..3cbb8e632 100644
--- a/tools/syntax/vim/syntax/icinga2.vim
+++ b/tools/syntax/vim/syntax/icinga2.vim
@@ -57,7 +57,7 @@ syn keyword icinga2ObjType Comment Dependency Downtime ElasticsearchWriter
syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener
syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup
syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection
-syn keyword icinga2ObjType InfluxdbWriter LivestatusListener Notification NotificationCommand
+syn keyword icinga2ObjType InfluxdbWriter Influxdb2Writer LivestatusListener Notification NotificationCommand
syn keyword icinga2ObjType NotificationComponent OpenTsdbWriter PerfdataWriter
syn keyword icinga2ObjType ScheduledDowntime Service ServiceGroup SyslogLogger
syn keyword icinga2ObjType TimePeriod User UserGroup WindowsEventLogLogger Zone