diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index 499e77a89..aa35fec0e 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -49,24 +49,6 @@ using namespace icinga; -class InfluxdbInteger : public Object -{ -public: - DECLARE_PTR_TYPEDEFS(InfluxdbInteger); - - InfluxdbInteger(int value) - : m_Value(value) - { } - - int GetValue(void) const - { - return m_Value; - } - -private: - int m_Value; -}; - REGISTER_TYPE(InfluxdbWriter); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); @@ -124,7 +106,7 @@ void InfluxdbWriter::Start(bool runtimeCreated) m_FlushTimer->Reschedule(0); /* Register for new metrics. */ - Checkable::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); + Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); } void InfluxdbWriter::Stop(bool runtimeRemoved) @@ -152,7 +134,7 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) //TODO: Close the connection, if we keep it open. } -Stream::Ptr InfluxdbWriter::Connect(void) +Stream::Ptr InfluxdbWriter::Connect() { TcpSocket::Ptr socket = new TcpSocket(); @@ -194,10 +176,10 @@ Stream::Ptr InfluxdbWriter::Connect(void) void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { - m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow); + m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr)); } -void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { AssertOnWorkQueue(); @@ -229,16 +211,28 @@ void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const if (tags) { ObjectLock olock(tags); for (const Dictionary::Pair& pair : tags) { + // Prevent missing macros from warning; will return an empty value + // which will be filtered out in SendMetric() String missing_macro; - Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); - - if (!missing_macro.IsEmpty()) - continue; - - tags->Set(pair.first, value); + tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro)); } } + SendPerfdata(tmpl, checkable, cr, ts); +} + +String InfluxdbWriter::FormatInteger(int val) +{ + return Convert::ToString(val) + "i"; +} + +String InfluxdbWriter::FormatBoolean(bool val) +{ + return val ? "true" : "false"; +} + +void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts) +{ Array::Ptr perfdata = cr->GetPerformanceData(); if (perfdata) { ObjectLock olock(perfdata); @@ -283,16 +277,16 @@ void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const Dictionary::Ptr fields = new Dictionary(); if (service) - fields->Set("state", new InfluxdbInteger(service->GetState())); + fields->Set("state", FormatInteger(service->GetState())); else - fields->Set("state", new InfluxdbInteger(host->GetState())); + fields->Set("state", FormatInteger(host->GetState())); - fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt())); - fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts())); - fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType())); - fields->Set("reachable", checkable->IsReachable()); - fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth())); - fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement())); + fields->Set("current_attempt", FormatInteger(checkable->GetCheckAttempt())); + fields->Set("max_check_attempts", FormatInteger(checkable->GetMaxCheckAttempts())); + fields->Set("state_type", FormatInteger(checkable->GetStateType())); + fields->Set("reachable", FormatBoolean(checkable->IsReachable())); + fields->Set("downtime_depth", FormatInteger(checkable->GetDowntimeDepth())); + fields->Set("acknowledgement", FormatInteger(checkable->GetAcknowledgement())); fields->Set("latency", cr->CalculateLatency()); fields->Set("execution_time", cr->CalculateExecutionTime()); @@ -300,7 +294,7 @@ void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const } } -String InfluxdbWriter::EscapeKeyOrTagValue(const String& str) +String InfluxdbWriter::EscapeKey(const String& str) { // Iterate over the key name and escape commas and spaces with a backslash String result = str; @@ -308,28 +302,57 @@ String InfluxdbWriter::EscapeKeyOrTagValue(const String& str) boost::algorithm::replace_all(result, "=", "\\="); boost::algorithm::replace_all(result, ",", "\\,"); boost::algorithm::replace_all(result, " ", "\\ "); + + // InfluxDB 'feature': although backslashes are allowed in keys they also act + // as escape sequences when followed by ',' or ' '. When your tag is like + // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped + // and through experimentation they also escape '='. To be safe we replace + // trailing backslashes with and underscore. + size_t length = result.GetLength(); + if (result[length - 1] == '\\') + result[length - 1] = '_'; + return result; } -String InfluxdbWriter::EscapeValue(const Value& value) +String InfluxdbWriter::EscapeField(const String& str) { - if (value.IsObjectType()) { - std::ostringstream os; - os << static_cast(value)->GetValue() - << "i"; - return os.str(); + //TODO: Evaluate whether boost::regex is really needed here. + + // Handle integers + boost::regex integer("-?\\d+i"); + if (boost::regex_match(str.GetData(), integer)) { + return str; } - if (value.IsBoolean()) - return value ? "true" : "false"; + // Handle numerics + boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?"); + if (boost::regex_match(str.GetData(), numeric)) { + return str; + } - return value; + // Handle booleans + boost::regex boolean_true("t|true", boost::regex::icase); + if (boost::regex_match(str.GetData(), boolean_true)) + return "true"; + boost::regex boolean_false("f|false", boost::regex::icase); + if (boost::regex_match(str.GetData(), boolean_false)) + return "false"; + + // Handle NaNs + if (boost::math::isnan(str)) + return 0; + + // Otherwise it's a string and needs escaping and quoting + String result = str; + boost::algorithm::replace_all(result, "\"", "\\\""); + return "\"" + result + "\""; } void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts) { std::ostringstream msgbuf; - msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement")); + msgbuf << EscapeKey(tmpl->Get("measurement")); Dictionary::Ptr tags = tmpl->Get("tags"); if (tags) { @@ -337,14 +360,14 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label for (const Dictionary::Pair& pair : tags) { // Empty macro expansion, no tag if (!pair.second.IsEmpty()) { - msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second); + msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second); } } } - // Label may be empty in the case of metadata + // Label is may be empty in the case of metadata if (!label.IsEmpty()) - msgbuf << ",metric=" << EscapeKeyOrTagValue(label); + msgbuf << ",metric=" << EscapeKey(label); msgbuf << " "; @@ -358,54 +381,45 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label else msgbuf << ","; - msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second); + msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second); } } msgbuf << " " << static_cast(ts); -#ifdef I2_DEBUG Log(LogDebug, "InfluxdbWriter") << "Add to metric list: '" << msgbuf.str() << "'."; -#endif /* I2_DEBUG */ - // Buffer the data point - m_DataBuffer.push_back(msgbuf.str()); + // Atomically buffer the data point + boost::mutex::scoped_lock lock(m_DataBufferMutex); + m_DataBuffer.push_back(String(msgbuf.str())); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { Log(LogDebug, "InfluxdbWriter") << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; - - try { - Flush(); - } catch (...) { - /* Do nothing. */ - } + Flush(); } } void InfluxdbWriter::FlushTimeout(void) { - m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushTimeoutWQ, this), PriorityHigh); -} - -void InfluxdbWriter::FlushTimeoutWQ(void) -{ - AssertOnWorkQueue(); + // Prevent new data points from being added to the array, there is a + // race condition where they could disappear + boost::mutex::scoped_lock lock(m_DataBufferMutex); // Flush if there are any data available - if (m_DataBuffer.empty()) - return; - - Log(LogDebug, "InfluxdbWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - - Flush(); + if (m_DataBuffer.size() > 0) { + Log(LogDebug, "InfluxdbWriter") + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + Flush(); + } } void InfluxdbWriter::Flush(void) { + // Ensure you hold a lock against m_DataBuffer so that things + // don't go missing after creating the body and clearing the buffer String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); @@ -443,27 +457,25 @@ void InfluxdbWriter::Flush(void) throw ex; } + //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options. HttpResponse resp(stream, req); StreamReadContext context; try { - while (resp.Parse(context, true) && !resp.Complete) - ; /* Do nothing */ + resp.Parse(context, true); } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); + << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; throw ex; } - if (!resp.Complete) { - Log(LogWarning, "InfluxdbWriter") - << "Failed to read a complete HTTP response from the InfluxDB server."; - return; - } - if (resp.StatusCode != 204) { Log(LogWarning, "InfluxdbWriter") - << "Unexpected response code: " << resp.StatusCode; + << "Unexpected response code " << resp.StatusCode; + + // Finish parsing the headers and body + while (!resp.Complete) + resp.Parse(context, true); String contentType = resp.Headers->Get("content-type"); if (contentType != "application/json") { @@ -490,8 +502,6 @@ void InfluxdbWriter::Flush(void) Log(LogCritical, "InfluxdbWriter") << "InfluxDB error message:\n" << error; - - return; } } diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index 97546bc84..e0a149c3a 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -59,16 +59,20 @@ private: WorkQueue m_WorkQueue; Timer::Ptr m_FlushTimer; std::vector m_DataBuffer; + boost::mutex m_DataBufferMutex; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts); void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(void); - void FlushTimeoutWQ(void); void Flush(void); - static String EscapeKeyOrTagValue(const String& str); - static String EscapeValue(const Value& value); + static String FormatInteger(int val); + static String FormatBoolean(bool val); + + static String EscapeKey(const String& str); + static String EscapeField(const String& str); Stream::Ptr Connect();