mirror of https://github.com/Icinga/icinga2.git
Add InfluxDB Writer
Adds an Icinga2 object to directly interface with InfluxDB's native HTTP API. This supports optional basic authorization, and TLS transport. InfluxDB didn't appear to like having the TLS stream kept open, so instead this object buffers data points which are then flushed to InfluxDB as a batch write, either driven by a configurable timeout or threshold. As InfluxDB is a schema-less database the host and service templates are user configurable allowing both the measurement field and tags to be set by the end user via macro expansion. This allows access to tag fields from arbitrary data associated with host.vars or service.vars. If a particular value is unable to be resolved, the tag will be dropped and not transmitted to InfluxDB. Also alters URL handling to omit array brackets when only a single value is attached to a key, otherwise InfluxDB has a strop with non-standard syntax. fixes #10480 Signed-off-by: Michael Friedrich <michael.friedrich@netways.de>
This commit is contained in:
parent
2faf121ff4
commit
79c1e883d1
1
AUTHORS
1
AUTHORS
|
@ -78,6 +78,7 @@ Rune Darrud <theflyingcorpse@gmail.com>
|
||||||
Sam Kottler <shk@linux.com>
|
Sam Kottler <shk@linux.com>
|
||||||
Sebastian Brückner <mail@invlid.com>
|
Sebastian Brückner <mail@invlid.com>
|
||||||
Sebastian Chrostek <sebastian@chrostek.net>
|
Sebastian Chrostek <sebastian@chrostek.net>
|
||||||
|
Simon Murray <spjmurray@yahoo.co.uk>
|
||||||
Simon Ruderich <simon@ruderich.org>
|
Simon Ruderich <simon@ruderich.org>
|
||||||
Siyalrach Anton Thomas <sat@level8.dk>
|
Siyalrach Anton Thomas <sat@level8.dk>
|
||||||
Stefan Triep <stefan@triep.net>
|
Stefan Triep <stefan@triep.net>
|
||||||
|
|
|
@ -27,7 +27,7 @@ Configure npcd to use the performance data created by Icinga 2:
|
||||||
Set `perfdata_spool_dir = /var/spool/icinga2/perfdata` and restart the `npcd` daemon.
|
Set `perfdata_spool_dir = /var/spool/icinga2/perfdata` and restart the `npcd` daemon.
|
||||||
|
|
||||||
There's also an Icinga Web 2 module for direct PNP graph integration
|
There's also an Icinga Web 2 module for direct PNP graph integration
|
||||||
available at https://exchange.icinga.org/icinga/PNP4Nagios
|
available at [Icinga Exchange](https://exchange.icinga.org/icinga/PNP).
|
||||||
|
|
||||||
More information on [action_url as attribute](14-addons-plugins.md#addons-graphing-pnp-action-url)
|
More information on [action_url as attribute](14-addons-plugins.md#addons-graphing-pnp-action-url)
|
||||||
and [graph template names](14-addons-plugins.md#addons-graphing-pnp-custom-templates).
|
and [graph template names](14-addons-plugins.md#addons-graphing-pnp-custom-templates).
|
||||||
|
@ -59,11 +59,10 @@ A popular alternative frontend for Graphite is for example [Grafana](http://graf
|
||||||
[InfluxDB](https://influxdb.com) is a time series, metrics, and analytics database.
|
[InfluxDB](https://influxdb.com) is a time series, metrics, and analytics database.
|
||||||
It’s written in Go and has no external dependencies.
|
It’s written in Go and has no external dependencies.
|
||||||
|
|
||||||
Use the [GraphiteWriter](15-features.md#graphite-carbon-cache-writer) feature
|
Use the [InfluxdbWriter](15-features.md#influxdb-writer) feature
|
||||||
for sending real-time metrics from Icinga 2 to InfluxDB. Note: There are [API changes](https://github.com/influxdb/influxdb/issues/2102)
|
for sending real-time metrics from Icinga 2 to InfluxDB.
|
||||||
in InfluxDB 0.9.x.
|
|
||||||
|
|
||||||
# icinga2 feature enable graphite
|
# icinga2 feature enable influxdb
|
||||||
|
|
||||||
A popular frontend for InfluxDB is for example [Grafana](http://grafana.org).
|
A popular frontend for InfluxDB is for example [Grafana](http://grafana.org).
|
||||||
|
|
||||||
|
|
|
@ -311,6 +311,20 @@ Cache. Please make sure that the order is correct because the first match wins.
|
||||||
pattern = ^icinga\.
|
pattern = ^icinga\.
|
||||||
retentions = 1m:2d,5m:10d,30m:90d,360m:4y
|
retentions = 1m:2d,5m:10d,30m:90d,360m:4y
|
||||||
|
|
||||||
|
### <a id="influxdb-writer"></a> InfluxDB Writer
|
||||||
|
|
||||||
|
Once there are new metrics available, Icinga 2 will directly write them to the
|
||||||
|
defined InfluxDB HTTP API.
|
||||||
|
|
||||||
|
You can enable the feature using
|
||||||
|
|
||||||
|
# icinga2 feature enable influxdb
|
||||||
|
|
||||||
|
By default the [InfluxdbWriter](6-object-types.md#objecttype-influxdbwriter) feature
|
||||||
|
expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
|
||||||
|
|
||||||
|
More configuration details can be found [here](6-object-types.md#objecttype-influxdbwriter).
|
||||||
|
|
||||||
### <a id="gelfwriter"></a> GELF Writer
|
### <a id="gelfwriter"></a> GELF Writer
|
||||||
|
|
||||||
The `Graylog Extended Log Format` (short: [GELF](http://www.graylog2.org/resources/gelf))
|
The `Graylog Extended Log Format` (short: [GELF](http://www.graylog2.org/resources/gelf))
|
||||||
|
|
|
@ -830,6 +830,60 @@ External interfaces like Icinga Web 2 require everything except `DbCatCheck`
|
||||||
which is the default value if `categories` is not set.
|
which is the default value if `categories` is not set.
|
||||||
|
|
||||||
|
|
||||||
|
## <a id="objecttype-influxdbwriter"></a> InfluxdbWriter
|
||||||
|
|
||||||
|
Writes check result metrics and performance data to a defined InfluxDB host.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
library "perfdata"
|
||||||
|
|
||||||
|
object InfluxdbWriter "influxdb" {
|
||||||
|
host = "127.0.0.1"
|
||||||
|
port = 8086
|
||||||
|
database = "icinga2"
|
||||||
|
host_template = {
|
||||||
|
measurement = "$host.check_command$"
|
||||||
|
tags = {
|
||||||
|
hostname = "$host.name$"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service_template = {
|
||||||
|
measurement = "$service.check_command$"
|
||||||
|
tags = {
|
||||||
|
hostname = "$host.name$"
|
||||||
|
service = "$service.name$"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Measurement names and tags are fully configurable by the end user. The InfluxdbWriter
|
||||||
|
object will automatically add a `metric` and `type` tag to each data point. These
|
||||||
|
correlate to perfdata label and perfdata field (value, warn, crit, min, max) respectively.
|
||||||
|
If a value associated with a tag is not able to be resolved it will be dropped and not
|
||||||
|
sent to the target host.
|
||||||
|
|
||||||
|
The database is assumed to exist so this object will make no attempt to create it currently.
|
||||||
|
|
||||||
|
Configuration Attributes:
|
||||||
|
|
||||||
|
Name |Description
|
||||||
|
-----------------------|---------------------------------------------------------------------------------------------------------
|
||||||
|
host | **Required.** InfluxDB host address. Defaults to `127.0.0.1`.
|
||||||
|
port | **Required.** InfluxDB HTTP port. Defaults to `8086`.
|
||||||
|
database | **Required.** InfluxDB database name. Defaults to `icinga2`.
|
||||||
|
username | **Optional.** InfluxDB user name. Defaults to `none`.
|
||||||
|
password | **Optional.** InfluxDB user password. Defaults to `none`.
|
||||||
|
ssl_enable | **Optional.** Whether to use a TLS stream. Defaults to `false`.
|
||||||
|
ssl_ca_cert | **Optional.** CA certificate to validate the remote host.
|
||||||
|
ssl_cert | **Optional.** Host certificate to present to the remote host for mutual verification.
|
||||||
|
ssl_key | **Optional.** Host key to accompany the ssl_cert
|
||||||
|
host_template | **Required.** Host template to define the InfluxDB line protocol.
|
||||||
|
service_template | **Required.** Service template to define the influxDB line protocol.
|
||||||
|
enable_send_thresholds | **Optional.** Whether to send warn, crit, min & max tagged data.
|
||||||
|
flush_interval | **Optional.** How long to buffer data points before transfering to InfluxDB. Defaults to `10s`.
|
||||||
|
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
|
||||||
|
|
||||||
## <a id="objecttype-livestatuslistener"></a> LiveStatusListener
|
## <a id="objecttype-livestatuslistener"></a> LiveStatusListener
|
||||||
|
|
||||||
Livestatus API interface available as TCP or UNIX socket. Historical table queries
|
Livestatus API interface available as TCP or UNIX socket. Historical table queries
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
/**
|
||||||
|
* The InfluxdbWriter type writes check result metrics and
|
||||||
|
* performance data to an InfluxDB HTTP API
|
||||||
|
*/
|
||||||
|
|
||||||
|
library "perfdata"
|
||||||
|
|
||||||
|
object InfluxdbWriter "influxdb" {
|
||||||
|
//host = "127.0.0.1"
|
||||||
|
//port = 8086
|
||||||
|
//database = "icinga2"
|
||||||
|
//host_template = {
|
||||||
|
// measurement = "$host.check_command$"
|
||||||
|
// tags = {
|
||||||
|
// hostname = "$host.name$"
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
//service_template = {
|
||||||
|
// measurement = "$service.check_command$"
|
||||||
|
// tags = {
|
||||||
|
// hostname = "$host.name$"
|
||||||
|
// service = "$service.name$"
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
}
|
|
@ -17,11 +17,12 @@
|
||||||
|
|
||||||
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
|
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
|
||||||
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
|
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
|
||||||
|
mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
|
||||||
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
|
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
|
||||||
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
|
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
|
||||||
|
|
||||||
set(perfdata_SOURCES
|
set(perfdata_SOURCES
|
||||||
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
|
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
|
||||||
)
|
)
|
||||||
|
|
||||||
if(ICINGA2_UNITY_BUILD)
|
if(ICINGA2_UNITY_BUILD)
|
||||||
|
@ -49,6 +50,11 @@ install_if_not_exists(
|
||||||
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
||||||
)
|
)
|
||||||
|
|
||||||
|
install_if_not_exists(
|
||||||
|
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf
|
||||||
|
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
||||||
|
)
|
||||||
|
|
||||||
install_if_not_exists(
|
install_if_not_exists(
|
||||||
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
|
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
|
||||||
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
||||||
|
|
|
@ -0,0 +1,349 @@
|
||||||
|
/******************************************************************************
|
||||||
|
* Icinga 2 *
|
||||||
|
* Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) *
|
||||||
|
* *
|
||||||
|
* This program is free software; you can redistribute it and/or *
|
||||||
|
* modify it under the terms of the GNU General Public License *
|
||||||
|
* as published by the Free Software Foundation; either version 2 *
|
||||||
|
* of the License, or (at your option) any later version. *
|
||||||
|
* *
|
||||||
|
* This program is distributed in the hope that it will be useful, *
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||||
|
* GNU General Public License for more details. *
|
||||||
|
* *
|
||||||
|
* You should have received a copy of the GNU General Public License *
|
||||||
|
* along with this program; if not, write to the Free Software Foundation *
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
#include "perfdata/influxdbwriter.hpp"
|
||||||
|
#include "perfdata/influxdbwriter.tcpp"
|
||||||
|
#include "remote/url.hpp"
|
||||||
|
#include "remote/httprequest.hpp"
|
||||||
|
#include "remote/httpresponse.hpp"
|
||||||
|
#include "icinga/service.hpp"
|
||||||
|
#include "icinga/macroprocessor.hpp"
|
||||||
|
#include "icinga/icingaapplication.hpp"
|
||||||
|
#include "icinga/compatutility.hpp"
|
||||||
|
#include "icinga/perfdatavalue.hpp"
|
||||||
|
#include "icinga/checkcommand.hpp"
|
||||||
|
#include "base/tcpsocket.hpp"
|
||||||
|
#include "base/configtype.hpp"
|
||||||
|
#include "base/objectlock.hpp"
|
||||||
|
#include "base/logger.hpp"
|
||||||
|
#include "base/convert.hpp"
|
||||||
|
#include "base/utility.hpp"
|
||||||
|
#include "base/application.hpp"
|
||||||
|
#include "base/stream.hpp"
|
||||||
|
#include "base/networkstream.hpp"
|
||||||
|
#include "base/exception.hpp"
|
||||||
|
#include "base/statsfunction.hpp"
|
||||||
|
#include "base/tlsutility.hpp"
|
||||||
|
#include <boost/algorithm/string.hpp>
|
||||||
|
#include <boost/algorithm/string/classification.hpp>
|
||||||
|
#include <boost/foreach.hpp>
|
||||||
|
#include <boost/algorithm/string/split.hpp>
|
||||||
|
#include <boost/algorithm/string/replace.hpp>
|
||||||
|
|
||||||
|
using namespace icinga;
|
||||||
|
|
||||||
|
REGISTER_TYPE(InfluxdbWriter);
|
||||||
|
|
||||||
|
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
|
||||||
|
|
||||||
|
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
|
||||||
|
{
|
||||||
|
Dictionary::Ptr nodes = new Dictionary();
|
||||||
|
|
||||||
|
BOOST_FOREACH(const InfluxdbWriter::Ptr& influxdbwriter, ConfigType::GetObjectsByType<InfluxdbWriter>()) {
|
||||||
|
nodes->Set(influxdbwriter->GetName(), 1); //add more stats
|
||||||
|
}
|
||||||
|
|
||||||
|
status->Set("influxdbwriter", nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::Start(bool runtimeCreated)
|
||||||
|
{
|
||||||
|
m_DataBuffer = new Array();
|
||||||
|
|
||||||
|
ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
|
||||||
|
|
||||||
|
m_FlushTimer = new Timer();
|
||||||
|
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||||
|
m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
|
||||||
|
m_FlushTimer->Start();
|
||||||
|
m_FlushTimer->Reschedule(0);
|
||||||
|
|
||||||
|
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||||
|
}
|
||||||
|
|
||||||
|
Stream::Ptr InfluxdbWriter::Connect(void)
|
||||||
|
{
|
||||||
|
TcpSocket::Ptr socket = new TcpSocket();
|
||||||
|
|
||||||
|
Log(LogNotice, "InfluxdbWriter")
|
||||||
|
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
|
||||||
|
try {
|
||||||
|
socket->Connect(GetHost(), GetPort());
|
||||||
|
} catch (std::exception&) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
return Stream::Ptr();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (GetSslEnable()) {
|
||||||
|
boost::shared_ptr<SSL_CTX> ssl_context;
|
||||||
|
try {
|
||||||
|
ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||||
|
} catch (std::exception&) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Unable to create SSL context.";
|
||||||
|
return Stream::Ptr();
|
||||||
|
}
|
||||||
|
|
||||||
|
TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context);
|
||||||
|
try {
|
||||||
|
tls_stream->Handshake();
|
||||||
|
} catch (std::exception&) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
||||||
|
return Stream::Ptr();
|
||||||
|
}
|
||||||
|
|
||||||
|
return tls_stream;
|
||||||
|
} else {
|
||||||
|
return new NetworkStream(socket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||||
|
{
|
||||||
|
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
|
||||||
|
|
||||||
|
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
|
||||||
|
return;
|
||||||
|
|
||||||
|
Host::Ptr host;
|
||||||
|
Service::Ptr service;
|
||||||
|
boost::tie(host, service) = GetHostService(checkable);
|
||||||
|
|
||||||
|
MacroProcessor::ResolverList resolvers;
|
||||||
|
if (service)
|
||||||
|
resolvers.push_back(std::make_pair("service", service));
|
||||||
|
resolvers.push_back(std::make_pair("host", host));
|
||||||
|
resolvers.push_back(std::make_pair("icinga", IcingaApplication::GetInstance()));
|
||||||
|
|
||||||
|
String prefix;
|
||||||
|
|
||||||
|
double ts = cr->GetExecutionEnd();
|
||||||
|
|
||||||
|
// Clone the template and perform an in-place macro expansion of measurement and tag values
|
||||||
|
// Work Needed: Escape ' ', ',' and '=' in field keys, tag keys and tag values
|
||||||
|
// Quote field values when the type is string
|
||||||
|
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
|
||||||
|
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->Clone());
|
||||||
|
tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
|
||||||
|
|
||||||
|
Dictionary::Ptr tags = tmpl->Get("tags");
|
||||||
|
if (tags) {
|
||||||
|
ObjectLock olock(tags);
|
||||||
|
retry:
|
||||||
|
BOOST_FOREACH(const Dictionary::Pair& pair, tags) {
|
||||||
|
// Prevent missing macros from warning; will return an empty value
|
||||||
|
// which will be filtered out in SendMetric()
|
||||||
|
String missing_macro;
|
||||||
|
tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the service was appiled via a 'apply Service for' command then resolve the
|
||||||
|
// instance and add it as a tag (e.g. check_command = mtu, name = mtueth0, instance = eth0)
|
||||||
|
if (service && (service->GetName() != service->GetCheckCommand()->GetName())) {
|
||||||
|
tags->Set("instance", service->GetName().SubStr(service->GetCheckCommand()->GetName().GetLength()));
|
||||||
|
}
|
||||||
|
|
||||||
|
SendPerfdata(tmpl, cr, ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr tmpl, const CheckResult::Ptr& cr, double ts)
|
||||||
|
{
|
||||||
|
Array::Ptr perfdata = cr->GetPerformanceData();
|
||||||
|
|
||||||
|
if (!perfdata)
|
||||||
|
return;
|
||||||
|
|
||||||
|
ObjectLock olock(perfdata);
|
||||||
|
BOOST_FOREACH(const Value& val, perfdata) {
|
||||||
|
PerfdataValue::Ptr pdv;
|
||||||
|
|
||||||
|
if (val.IsObjectType<PerfdataValue>())
|
||||||
|
pdv = val;
|
||||||
|
else {
|
||||||
|
try {
|
||||||
|
pdv = PerfdataValue::Parse(val);
|
||||||
|
} catch (const std::exception&) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Ignoring invalid perfdata value: " << val;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SendMetric(tmpl, pdv->GetLabel(), "value", pdv->GetValue(), ts);
|
||||||
|
|
||||||
|
if (GetEnableSendThresholds()) {
|
||||||
|
if (pdv->GetCrit())
|
||||||
|
SendMetric(tmpl, pdv->GetLabel(), "crit", pdv->GetCrit(), ts);
|
||||||
|
if (pdv->GetWarn())
|
||||||
|
SendMetric(tmpl, pdv->GetLabel(), "warn", pdv->GetWarn(), ts);
|
||||||
|
if (pdv->GetMin())
|
||||||
|
SendMetric(tmpl, pdv->GetLabel(), "min", pdv->GetMin(), ts);
|
||||||
|
if (pdv->GetMax())
|
||||||
|
SendMetric(tmpl, pdv->GetLabel(), "max", pdv->GetMax(), ts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::SendMetric(const Dictionary::Ptr tmpl, const String& label, const String& type, double value, double ts)
|
||||||
|
{
|
||||||
|
std::ostringstream msgbuf;
|
||||||
|
msgbuf << tmpl->Get("measurement");
|
||||||
|
|
||||||
|
Dictionary::Ptr tags = tmpl->Get("tags");
|
||||||
|
if (tags) {
|
||||||
|
ObjectLock olock(tags);
|
||||||
|
BOOST_FOREACH(const Dictionary::Pair& pair, tags) {
|
||||||
|
// Empty macro expansion, no tag
|
||||||
|
if (!pair.second.IsEmpty())
|
||||||
|
msgbuf << "," << pair.first << "=" << pair.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgbuf << ",metric=" << label << ",type=" << type << " value=" << value << " " << static_cast<unsigned long>(ts);
|
||||||
|
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Add to metric list:'" << msgbuf.str() << "'.";
|
||||||
|
|
||||||
|
// Atomically buffer the data point
|
||||||
|
ObjectLock olock(m_DataBuffer);
|
||||||
|
m_DataBuffer->Add(String(msgbuf.str()));
|
||||||
|
|
||||||
|
// Flush if we've buffered too much to prevent excessive memory use
|
||||||
|
if (m_DataBuffer->GetLength() >= GetFlushThreshold()) {
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points";
|
||||||
|
Flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::FlushTimeout(void)
|
||||||
|
{
|
||||||
|
// Prevent new data points from being added to the array, there is a
|
||||||
|
// race condition where they could disappear
|
||||||
|
ObjectLock olock(m_DataBuffer);
|
||||||
|
|
||||||
|
// Flush if there are any data available
|
||||||
|
if (m_DataBuffer->GetLength() > 0) {
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Timer expired writing " << m_DataBuffer->GetLength() << " data points";
|
||||||
|
Flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::Flush(void)
|
||||||
|
{
|
||||||
|
Stream::Ptr stream = Connect();
|
||||||
|
|
||||||
|
// Unable to connect, play it safe and lose the data points
|
||||||
|
// to avoid a memory leak
|
||||||
|
if (!stream.get()) {
|
||||||
|
m_DataBuffer->Clear();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Url::Ptr url = new Url();
|
||||||
|
url->SetScheme(GetSslEnable() ? "https" : "http");
|
||||||
|
url->SetHost(GetHost());
|
||||||
|
url->SetPort(GetPort());
|
||||||
|
|
||||||
|
std::vector<String> path;
|
||||||
|
path.push_back("write");
|
||||||
|
url->SetPath(path);
|
||||||
|
|
||||||
|
url->AddQueryElement("db", GetDatabase());
|
||||||
|
url->AddQueryElement("precision", "s");
|
||||||
|
if (!GetUsername().IsEmpty())
|
||||||
|
url->AddQueryElement("u", GetUsername());
|
||||||
|
if (!GetPassword().IsEmpty())
|
||||||
|
url->AddQueryElement("p", GetPassword());
|
||||||
|
|
||||||
|
// Ensure you hold a lock against m_DataBuffer so that things
|
||||||
|
// don't go missing after creating the body and clearing the buffer
|
||||||
|
String body = Utility::Join(m_DataBuffer, '\n');
|
||||||
|
m_DataBuffer->Clear();
|
||||||
|
|
||||||
|
HttpRequest req(stream);
|
||||||
|
req.RequestMethod = "POST";
|
||||||
|
req.RequestUrl = url;
|
||||||
|
|
||||||
|
try {
|
||||||
|
req.WriteBody(body.CStr(), body.GetLength());
|
||||||
|
req.Finish();
|
||||||
|
} catch (const std::exception&) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
}
|
||||||
|
|
||||||
|
HttpResponse resp(stream, req);
|
||||||
|
StreamReadContext context;
|
||||||
|
|
||||||
|
try {
|
||||||
|
resp.Parse(context, true);
|
||||||
|
} catch (const std::exception) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resp.StatusCode != 204) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Unexpected response code " << resp.StatusCode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
|
||||||
|
{
|
||||||
|
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
|
||||||
|
|
||||||
|
String measurement = value->Get("measurement");
|
||||||
|
if (!MacroProcessor::ValidateMacroString(measurement))
|
||||||
|
BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("host_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
|
||||||
|
|
||||||
|
Dictionary::Ptr tags = value->Get("tags");
|
||||||
|
if (tags) {
|
||||||
|
ObjectLock olock(tags);
|
||||||
|
BOOST_FOREACH(const Dictionary::Pair& pair, tags) {
|
||||||
|
if (!MacroProcessor::ValidateMacroString(pair.second))
|
||||||
|
BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("host_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InfluxdbWriter::ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
|
||||||
|
{
|
||||||
|
ObjectImpl<InfluxdbWriter>::ValidateServiceTemplate(value, utils);
|
||||||
|
|
||||||
|
String measurement = value->Get("measurement");
|
||||||
|
if (!MacroProcessor::ValidateMacroString(measurement))
|
||||||
|
BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("service_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
|
||||||
|
|
||||||
|
Dictionary::Ptr tags = value->Get("tags");
|
||||||
|
if (tags) {
|
||||||
|
ObjectLock olock(tags);
|
||||||
|
BOOST_FOREACH(const Dictionary::Pair& pair, tags) {
|
||||||
|
if (!MacroProcessor::ValidateMacroString(pair.second))
|
||||||
|
BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("service_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/******************************************************************************
|
||||||
|
* Icinga 2 *
|
||||||
|
* Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) *
|
||||||
|
* *
|
||||||
|
* This program is free software; you can redistribute it and/or *
|
||||||
|
* modify it under the terms of the GNU General Public License *
|
||||||
|
* as published by the Free Software Foundation; either version 2 *
|
||||||
|
* of the License, or (at your option) any later version. *
|
||||||
|
* *
|
||||||
|
* This program is distributed in the hope that it will be useful, *
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||||
|
* GNU General Public License for more details. *
|
||||||
|
* *
|
||||||
|
* You should have received a copy of the GNU General Public License *
|
||||||
|
* along with this program; if not, write to the Free Software Foundation *
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
#ifndef INFLUXDBWRITER_H
|
||||||
|
#define INFLUXDBWRITER_H
|
||||||
|
|
||||||
|
#include "perfdata/influxdbwriter.thpp"
|
||||||
|
#include "icinga/service.hpp"
|
||||||
|
#include "base/configobject.hpp"
|
||||||
|
#include "base/tcpsocket.hpp"
|
||||||
|
#include "base/timer.hpp"
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
|
namespace icinga
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An Icinga InfluxDB writer.
|
||||||
|
*
|
||||||
|
* @ingroup perfdata
|
||||||
|
*/
|
||||||
|
class InfluxdbWriter : public ObjectImpl<InfluxdbWriter>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
DECLARE_OBJECT(InfluxdbWriter);
|
||||||
|
DECLARE_OBJECTNAME(InfluxdbWriter);
|
||||||
|
|
||||||
|
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||||
|
|
||||||
|
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||||
|
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
virtual void Start(bool runtimeCreated) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Timer::Ptr m_FlushTimer;
|
||||||
|
Array::Ptr m_DataBuffer;
|
||||||
|
|
||||||
|
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||||
|
void SendPerfdata(const Dictionary::Ptr tmpl, const CheckResult::Ptr& cr, double ts);
|
||||||
|
void SendMetric(const Dictionary::Ptr tmpl, const String& label, const String& type, double value, double ts);
|
||||||
|
void FlushTimeout(void);
|
||||||
|
void Flush(void);
|
||||||
|
|
||||||
|
Stream::Ptr Connect(void);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* INFLUXDBWRITER_H */
|
|
@ -0,0 +1,109 @@
|
||||||
|
/******************************************************************************
|
||||||
|
* Icinga 2 *
|
||||||
|
* Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) *
|
||||||
|
* *
|
||||||
|
* This program is free software; you can redistribute it and/or *
|
||||||
|
* modify it under the terms of the GNU General Public License *
|
||||||
|
* as published by the Free Software Foundation; either version 2 *
|
||||||
|
* of the License, or (at your option) any later version. *
|
||||||
|
* *
|
||||||
|
* This program is distributed in the hope that it will be useful, *
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||||
|
* GNU General Public License for more details. *
|
||||||
|
* *
|
||||||
|
* You should have received a copy of the GNU General Public License *
|
||||||
|
* along with this program; if not, write to the Free Software Foundation *
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
#include "base/configobject.hpp"
|
||||||
|
|
||||||
|
library perfdata;
|
||||||
|
|
||||||
|
namespace icinga
|
||||||
|
{
|
||||||
|
|
||||||
|
class InfluxdbWriter : ConfigObject
|
||||||
|
{
|
||||||
|
[config, required] String host {
|
||||||
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
};
|
||||||
|
[config, required] String port {
|
||||||
|
default {{{ return "8086"; }}}
|
||||||
|
};
|
||||||
|
[config, required] String database {
|
||||||
|
default {{{ return "icinga2"; }}}
|
||||||
|
};
|
||||||
|
[config] String username {
|
||||||
|
default {{{ return ""; }}}
|
||||||
|
};
|
||||||
|
[config, no_user_view] String password {
|
||||||
|
default {{{ return ""; }}}
|
||||||
|
};
|
||||||
|
[config] bool ssl_enable {
|
||||||
|
default {{{ return false; }}}
|
||||||
|
};
|
||||||
|
[config] String ssl_ca_cert {
|
||||||
|
default {{{ return ""; }}}
|
||||||
|
};
|
||||||
|
[config] String ssl_cert {
|
||||||
|
default {{{ return ""; }}}
|
||||||
|
};
|
||||||
|
[config] String ssl_key{
|
||||||
|
default {{{ return ""; }}}
|
||||||
|
};
|
||||||
|
[config, required] Dictionary::Ptr host_template {
|
||||||
|
default {{{
|
||||||
|
Dictionary::Ptr tags = new Dictionary();
|
||||||
|
tags->Set("hostname", "$host.name$");
|
||||||
|
|
||||||
|
Dictionary::Ptr tmpl = new Dictionary();
|
||||||
|
tmpl->Set("measurement", "$host.check_command$");
|
||||||
|
tmpl->Set("tags", tags);
|
||||||
|
|
||||||
|
return tmpl;
|
||||||
|
}}}
|
||||||
|
};
|
||||||
|
[config, required] Dictionary::Ptr service_template {
|
||||||
|
default {{{
|
||||||
|
Dictionary::Ptr tags = new Dictionary();
|
||||||
|
tags->Set("hostname", "$host.name$");
|
||||||
|
tags->Set("service", "$service.name$");
|
||||||
|
|
||||||
|
Dictionary::Ptr tmpl = new Dictionary();
|
||||||
|
tmpl->Set("measurement", "$service.check_command$");
|
||||||
|
tmpl->Set("tags", tags);
|
||||||
|
|
||||||
|
return tmpl;
|
||||||
|
}}}
|
||||||
|
};
|
||||||
|
[config] bool enable_send_thresholds {
|
||||||
|
default {{{ return false; }}}
|
||||||
|
};
|
||||||
|
[config] int flush_interval {
|
||||||
|
default {{{ return 10; }}}
|
||||||
|
};
|
||||||
|
[config] int flush_threshold {
|
||||||
|
default {{{ return 1024; }}}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
validator InfluxdbWriter {
|
||||||
|
Dictionary host_template {
|
||||||
|
required measurement;
|
||||||
|
String measurement;
|
||||||
|
Dictionary "tags" {
|
||||||
|
String "*";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
Dictionary service_template {
|
||||||
|
required measurement;
|
||||||
|
String measurement;
|
||||||
|
Dictionary "tags" {
|
||||||
|
String "*";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
|
@ -263,7 +263,12 @@ String Url::Format(bool print_credentials) const
|
||||||
if (!temp.IsEmpty())
|
if (!temp.IsEmpty())
|
||||||
temp += "&";
|
temp += "&";
|
||||||
|
|
||||||
temp += key + "[]=" + Utility::EscapeString(s, ACQUERY, false);
|
temp += key;
|
||||||
|
|
||||||
|
if (kv.second.size() > 1)
|
||||||
|
temp += "[]";
|
||||||
|
|
||||||
|
temp += "=" + Utility::EscapeString(s, ACQUERY, false);
|
||||||
}
|
}
|
||||||
param += temp;
|
param += temp;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue