diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index 6bc9cd9bc..770d5d0ba 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -49,6 +49,24 @@ using namespace icinga; +class InfluxdbInteger : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(InfluxdbInteger); + + InfluxdbInteger(int value) + : m_Value(value) + { } + + int GetValue(void) const + { + return m_Value; + } + +private: + int m_Value; +}; + REGISTER_TYPE(InfluxdbWriter); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); @@ -106,7 +124,7 @@ void InfluxdbWriter::Start(bool runtimeCreated) m_FlushTimer->Reschedule(0); /* Register for new metrics. */ - Service::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); + Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); } void InfluxdbWriter::Stop(bool runtimeRemoved) @@ -134,7 +152,7 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) //TODO: Close the connection, if we keep it open. } -Stream::Ptr InfluxdbWriter::Connect() +Stream::Ptr InfluxdbWriter::Connect(void) { TcpSocket::Ptr socket = new TcpSocket(); @@ -176,10 +194,10 @@ Stream::Ptr InfluxdbWriter::Connect() void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { - m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr)); + m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow); } -void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { AssertOnWorkQueue(); @@ -211,28 +229,16 @@ void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, if (tags) { ObjectLock olock(tags); for (const Dictionary::Pair& pair : tags) { - // Prevent missing macros from warning; will return an empty value - // which will be filtered out in SendMetric() String missing_macro; - tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro)); + Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); + + if (!missing_macro.IsEmpty()) + continue; + + tags->Set(pair.first, value); } } - SendPerfdata(tmpl, checkable, cr, ts); -} - -String InfluxdbWriter::FormatInteger(int val) -{ - return Convert::ToString(val) + "i"; -} - -String InfluxdbWriter::FormatBoolean(bool val) -{ - return val ? "true" : "false"; -} - -void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts) -{ Array::Ptr perfdata = cr->GetPerformanceData(); if (perfdata) { ObjectLock olock(perfdata); @@ -280,16 +286,16 @@ void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable:: Dictionary::Ptr fields = new Dictionary(); if (service) - fields->Set("state", FormatInteger(service->GetState())); + fields->Set("state", new InfluxdbInteger(service->GetState())); else - fields->Set("state", FormatInteger(host->GetState())); + fields->Set("state", new InfluxdbInteger(host->GetState())); - fields->Set("current_attempt", FormatInteger(checkable->GetCheckAttempt())); - fields->Set("max_check_attempts", FormatInteger(checkable->GetMaxCheckAttempts())); - fields->Set("state_type", FormatInteger(checkable->GetStateType())); - fields->Set("reachable", FormatBoolean(checkable->IsReachable())); - fields->Set("downtime_depth", FormatInteger(checkable->GetDowntimeDepth())); - fields->Set("acknowledgement", FormatInteger(checkable->GetAcknowledgement())); + fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt())); + fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts())); + fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType())); + fields->Set("reachable", checkable->IsReachable()); + fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth())); + fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement())); fields->Set("latency", cr->CalculateLatency()); fields->Set("execution_time", cr->CalculateExecutionTime()); @@ -297,7 +303,7 @@ void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable:: } } -String InfluxdbWriter::EscapeKey(const String& str) +String InfluxdbWriter::EscapeKeyOrTagValue(const String& str) { // Iterate over the key name and escape commas and spaces with a backslash String result = str; @@ -305,57 +311,28 @@ String InfluxdbWriter::EscapeKey(const String& str) boost::algorithm::replace_all(result, "=", "\\="); boost::algorithm::replace_all(result, ",", "\\,"); boost::algorithm::replace_all(result, " ", "\\ "); - - // InfluxDB 'feature': although backslashes are allowed in keys they also act - // as escape sequences when followed by ',' or ' '. When your tag is like - // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped - // and through experimentation they also escape '='. To be safe we replace - // trailing backslashes with and underscore. - size_t length = result.GetLength(); - if (result[length - 1] == '\\') - result[length - 1] = '_'; - return result; } -String InfluxdbWriter::EscapeField(const String& str) +String InfluxdbWriter::EscapeValue(const Value& value) { - //TODO: Evaluate whether boost::regex is really needed here. - - // Handle integers - boost::regex integer("-?\\d+i"); - if (boost::regex_match(str.GetData(), integer)) { - return str; + if (value.IsObjectType()) { + std::ostringstream os; + os << static_cast(value)->GetValue() + << "i"; + return os.str(); } - // Handle numerics - boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?"); - if (boost::regex_match(str.GetData(), numeric)) { - return str; - } + if (value.IsBoolean()) + return value ? "true" : "false"; - // Handle booleans - boost::regex boolean_true("t|true", boost::regex::icase); - if (boost::regex_match(str.GetData(), boolean_true)) - return "true"; - boost::regex boolean_false("f|false", boost::regex::icase); - if (boost::regex_match(str.GetData(), boolean_false)) - return "false"; - - // Handle NaNs - if (boost::math::isnan(str)) - return 0; - - // Otherwise it's a string and needs escaping and quoting - String result = str; - boost::algorithm::replace_all(result, "\"", "\\\""); - return "\"" + result + "\""; + return value; } void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts) { std::ostringstream msgbuf; - msgbuf << EscapeKey(tmpl->Get("measurement")); + msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement")); Dictionary::Ptr tags = tmpl->Get("tags"); if (tags) { @@ -363,14 +340,14 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label for (const Dictionary::Pair& pair : tags) { // Empty macro expansion, no tag if (!pair.second.IsEmpty()) { - msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second); + msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second); } } } - // Label is may be empty in the case of metadata + // Label may be empty in the case of metadata if (!label.IsEmpty()) - msgbuf << ",metric=" << EscapeKey(label); + msgbuf << ",metric=" << EscapeKeyOrTagValue(label); msgbuf << " "; @@ -384,45 +361,54 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label else msgbuf << ","; - msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second); + msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second); } } msgbuf << " " << static_cast(ts); +#ifdef I2_DEBUG Log(LogDebug, "InfluxdbWriter") << "Add to metric list: '" << msgbuf.str() << "'."; +#endif /* I2_DEBUG */ - // Atomically buffer the data point - boost::mutex::scoped_lock lock(m_DataBufferMutex); - m_DataBuffer.push_back(String(msgbuf.str())); + // Buffer the data point + m_DataBuffer.push_back(msgbuf.str()); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { Log(LogDebug, "InfluxdbWriter") << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; - Flush(); + + try { + Flush(); + } catch (...) { + /* Do nothing. */ + } } } void InfluxdbWriter::FlushTimeout(void) { - // Prevent new data points from being added to the array, there is a - // race condition where they could disappear - boost::mutex::scoped_lock lock(m_DataBufferMutex); + m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushTimeoutWQ, this), PriorityHigh); +} + +void InfluxdbWriter::FlushTimeoutWQ(void) +{ + AssertOnWorkQueue(); // Flush if there are any data available - if (m_DataBuffer.size() > 0) { - Log(LogDebug, "InfluxdbWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); - } + if (m_DataBuffer.empty()) + return; + + Log(LogDebug, "InfluxdbWriter") + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + + Flush(); } void InfluxdbWriter::Flush(void) { - // Ensure you hold a lock against m_DataBuffer so that things - // don't go missing after creating the body and clearing the buffer String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); @@ -460,25 +446,27 @@ void InfluxdbWriter::Flush(void) throw ex; } - //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options. HttpResponse resp(stream, req); StreamReadContext context; try { - resp.Parse(context, true); + while (resp.Parse(context, true) && !resp.Complete) + ; /* Do nothing */ } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") - << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); throw ex; } + if (!resp.Complete) { + Log(LogWarning, "InfluxdbWriter") + << "Failed to read a complete HTTP response from the InfluxDB server."; + return; + } + if (resp.StatusCode != 204) { Log(LogWarning, "InfluxdbWriter") - << "Unexpected response code " << resp.StatusCode; - - // Finish parsing the headers and body - while (!resp.Complete) - resp.Parse(context, true); + << "Unexpected response code: " << resp.StatusCode; String contentType = resp.Headers->Get("content-type"); if (contentType != "application/json") { @@ -505,6 +493,8 @@ void InfluxdbWriter::Flush(void) Log(LogCritical, "InfluxdbWriter") << "InfluxDB error message:\n" << error; + + return; } } diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index e0a149c3a..5d924c376 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -26,7 +26,6 @@ #include "base/tcpsocket.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" -#include #include namespace icinga @@ -59,20 +58,16 @@ private: WorkQueue m_WorkQueue; Timer::Ptr m_FlushTimer; std::vector m_DataBuffer; - boost::mutex m_DataBufferMutex; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts); + void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(void); + void FlushTimeoutWQ(void); void Flush(void); - static String FormatInteger(int val); - static String FormatBoolean(bool val); - - static String EscapeKey(const String& str); - static String EscapeField(const String& str); + static String EscapeKeyOrTagValue(const String& str); + static String EscapeValue(const Value& value); Stream::Ptr Connect();