Update InfluxDB line formatting

Fixes a couple issues to do with line formatting of influx DB data points.  All
keys and values need commas and white space escaping.  Values are also checked
for type.  If a numeric or scientific value is detected this is output as an
Influx floating point/scientific number.  Booleans are detected and output in
a canonical format.  All other values are strings, which have double quotes
escaped and the entire string is wrapped in double quotes.  The handling of
thresholds has changed before this becomes officially released.  These values
if available are passed to the accumulation function in a dictionary, said
dictionary builds a single data point with multiple fields, rather than the
existing 5 data points, thus saving bandwidth costs.

fixes #11904

Signed-off-by: Gunnar Beutner <gunnar.beutner@netways.de>
This commit is contained in:
Simon Murray 2016-06-07 13:35:16 +01:00 committed by Gunnar Beutner
parent 0eb0992d5e
commit 899592c8ad
3 changed files with 110 additions and 26 deletions

View File

@ -882,10 +882,10 @@ Example:
}
Measurement names and tags are fully configurable by the end user. The InfluxdbWriter
object will automatically add a `metric` and `type` tag to each data point. These
correlate to perfdata label and perfdata field (value, warn, crit, min, max) respectively.
If a value associated with a tag is not able to be resolved, it will be dropped and not
sent to the target host.
object will automatically add a `metric` tag to each data point. This correlates to the
perfdata label. Fields (value, warn, crit, min, max) are created from data if available
and the configuration allows it. If a value associated with a tag is not able to be
resolved, it will be dropped and not sent to the target host.
The database is assumed to exist so this object will make no attempt to create it currently.
@ -908,6 +908,45 @@ Configuration Attributes:
flush_interval | **Optional.** How long to buffer data points before transfering to InfluxDB. Defaults to `10s`.
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
### <a id="objecttype-influxdbwriter-instance-tags"></a> Instance Tagging
Consider the following service check:
apply Service "disk" for (disk => attributes in host.vars.disks) {
import "generic-service"
check_command = "disk"
display_name = "Disk " + disk
vars.disk_partitions = disk
assign where host.vars.disks
}
This is a typical pattern for checking individual disks, NICs, SSL certificates etc associated
with a host. What would be useful is to have the data points tagged with the specific instance
for that check. This would allow you to query time series data for a check on a host and for a
specific instance e.g. /dev/sda. To do this quite simply add the instance to the service variables:
apply Service "disk" for (disk => attributes in host.vars.disks) {
...
vars.instance = disk
...
}
Then modify your writer configuration to add this tag to your data points if the instance variable
is associated with the service:
object InfluxdbWriter "influxdb" {
...
service_template = {
measurement = "$service.check_command$"
tags = {
hostname = "$host.name$"
service = "$service.name$"
instance = "$service.vars.instance$"
}
}
...
}
## <a id="objecttype-livestatuslistener"></a> LiveStatusListener
Livestatus API interface available as TCP or UNIX socket. Historical table queries

View File

@ -45,6 +45,7 @@
#include <boost/foreach.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/regex.hpp>
using namespace icinga;
@ -140,8 +141,6 @@ void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
double ts = cr->GetExecutionEnd();
// Clone the template and perform an in-place macro expansion of measurement and tag values
// Work Needed: Escape ' ', ',' and '=' in field keys, tag keys and tag values
// Quote field values when the type is string
Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->Clone());
tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
@ -158,16 +157,10 @@ retry:
}
}
// If the service was appiled via a 'apply Service for' command then resolve the
// instance and add it as a tag (e.g. check_command = mtu, name = mtueth0, instance = eth0)
if (service && (service->GetName() != service->GetCheckCommand()->GetName())) {
tags->Set("instance", service->GetName().SubStr(service->GetCheckCommand()->GetName().GetLength()));
}
SendPerfdata(tmpl, cr, ts);
}
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr tmpl, const CheckResult::Ptr& cr, double ts)
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
@ -190,37 +183,86 @@ void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr tmpl, const CheckResult:
}
}
SendMetric(tmpl, pdv->GetLabel(), "value", pdv->GetValue(), ts);
Dictionary::Ptr fields = new Dictionary();
fields->Set(String("value"), pdv->GetValue());
if (GetEnableSendThresholds()) {
if (pdv->GetCrit())
SendMetric(tmpl, pdv->GetLabel(), "crit", pdv->GetCrit(), ts);
fields->Set(String("crit"), pdv->GetCrit());
if (pdv->GetWarn())
SendMetric(tmpl, pdv->GetLabel(), "warn", pdv->GetWarn(), ts);
fields->Set(String("warn"), pdv->GetWarn());
if (pdv->GetMin())
SendMetric(tmpl, pdv->GetLabel(), "min", pdv->GetMin(), ts);
fields->Set(String("min"), pdv->GetMin());
if (pdv->GetMax())
SendMetric(tmpl, pdv->GetLabel(), "max", pdv->GetMax(), ts);
fields->Set(String("max"), pdv->GetMax());
}
SendMetric(tmpl, pdv->GetLabel(), fields, ts);
}
}
void InfluxdbWriter::SendMetric(const Dictionary::Ptr tmpl, const String& label, const String& type, double value, double ts)
String InfluxdbWriter::EscapeKey(const String& str)
{
// Iterate over the key name and escape commas and spaces with a backslash
String result = str;
boost::algorithm::replace_all(result, ",", "\\,");
boost::algorithm::replace_all(result, " ", "\\ ");
return str;
}
String InfluxdbWriter::EscapeField(const String& str)
{
// Technically everything entering here from PerfdataValue is a
// double, but best have the safety net in place.
// Handle numerics
boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?");
if (boost::regex_match(str.GetData(), numeric)) {
return str;
}
// Handle booleans
boost::regex boolean_true("t|true", boost::regex::icase);
if (boost::regex_match(str.GetData(), boolean_true))
return "true";
boost::regex boolean_false("f|false", boost::regex::icase);
if (boost::regex_match(str.GetData(), boolean_false))
return "false";
// Otherwise it's a string and needs escaping and quoting
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
return "\"" + result + "\"";
}
void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts)
{
std::ostringstream msgbuf;
msgbuf << tmpl->Get("measurement");
msgbuf << EscapeKey(tmpl->Get("measurement"));
Dictionary::Ptr tags = tmpl->Get("tags");
if (tags) {
ObjectLock olock(tags);
BOOST_FOREACH(const Dictionary::Pair& pair, tags) {
// Empty macro expansion, no tag
if (!pair.second.IsEmpty())
msgbuf << "," << pair.first << "=" << pair.second;
if (!pair.second.IsEmpty()) {
msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second);
}
}
}
msgbuf << ",metric=" << label << ",type=" << type << " value=" << value << " " << static_cast<unsigned long>(ts);
msgbuf << ",metric=" << label << " ";
bool first = true;
ObjectLock fieldLock(fields);
BOOST_FOREACH(const Dictionary::Pair& pair, fields) {
if (first)
first = false;
else
msgbuf << ",";
msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
}
msgbuf << " " << static_cast<unsigned long>(ts);
Log(LogDebug, "InfluxdbWriter")
<< "Add to metric list:'" << msgbuf.str() << "'.";

View File

@ -54,11 +54,14 @@ private:
Array::Ptr m_DataBuffer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr tmpl, const CheckResult::Ptr& cr, double ts);
void SendMetric(const Dictionary::Ptr tmpl, const String& label, const String& type, double value, double ts);
void SendPerfdata(const Dictionary::Ptr& tmpl, const CheckResult::Ptr& cr, double ts);
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout(void);
void Flush(void);
static String EscapeKey(const String& str);
static String EscapeField(const String& str);
Stream::Ptr Connect(void);
};