From 3649a5a0d786cbdce93a1397309c7e73247dead4 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Thu, 4 May 2017 10:29:49 +0200 Subject: [PATCH] InfluxdbWriter: Use a work queue for async message processing; add stats log/api --- doc/9-object-types.md | 8 + etc/icinga2/features-available/influxdb.conf | 2 + lib/perfdata/influxdbwriter.cpp | 194 ++++++++++++++----- lib/perfdata/influxdbwriter.hpp | 28 ++- 4 files changed, 184 insertions(+), 48 deletions(-) diff --git a/doc/9-object-types.md b/doc/9-object-types.md index f16d09fab..73ab4fdd4 100644 --- a/doc/9-object-types.md +++ b/doc/9-object-types.md @@ -886,6 +886,10 @@ Example: host = "127.0.0.1" port = 8086 database = "icinga2" + + flush_threshold = 1024 + flush_interval = 10s + host_template = { measurement = "$host.check_command$" tags = { @@ -935,6 +939,10 @@ Configuration Attributes: flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. socket_timeout | **Optional.** How long to wait for InfluxDB to respond. Defaults to `5s`. +Note: If `flush_threshold` is set too low, this will always force the feature to flush all data +to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second +or similar. + ### Instance Tagging Consider the following service check: diff --git a/etc/icinga2/features-available/influxdb.conf b/etc/icinga2/features-available/influxdb.conf index 058568bf1..20f9ed253 100644 --- a/etc/icinga2/features-available/influxdb.conf +++ b/etc/icinga2/features-available/influxdb.conf @@ -9,6 +9,8 @@ object InfluxdbWriter "influxdb" { //host = "127.0.0.1" //port = 8086 //database = "icinga2" + //flush_threshold = 1024 + //flush_interval = 10s //host_template = { // measurement = "$host.check_command$" // tags = { diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index 198d0658f..a8e63d235 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -25,7 +25,6 @@ #include "icinga/service.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "icinga/compatutility.hpp" #include "icinga/perfdatavalue.hpp" #include "icinga/checkcommand.hpp" #include "base/tcpsocket.hpp" @@ -34,7 +33,6 @@ #include "base/logger.hpp" #include "base/convert.hpp" #include "base/utility.hpp" -#include "base/application.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" #include "base/exception.hpp" @@ -52,12 +50,32 @@ REGISTER_TYPE(InfluxdbWriter); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); +//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments. +InfluxdbWriter::InfluxdbWriter(void) + : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0) +{ } + +void InfluxdbWriter::OnConfigLoaded(void) +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName("InfluxdbWriter, " + GetName()); +} + void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) { Dictionary::Ptr nodes = new Dictionary(); for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) { - nodes->Set(influxdbwriter->GetName(), 1); //add more stats + size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength(); + size_t dataBufferItems = influxdbwriter->m_DataBuffer.size(); + + //TODO: Collect more stats + Dictionary::Ptr stats = new Dictionary(); + stats->Set("work_queue_items", workQueueItems); + stats->Set("data_buffer_items", dataBufferItems); + + nodes->Set(influxdbwriter->GetName(), stats); } status->Set("influxdbwriter", nodes); @@ -65,19 +83,28 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) void InfluxdbWriter::Start(bool runtimeCreated) { - m_DataBuffer = new Array(); - ObjectImpl::Start(runtimeCreated); Log(LogInformation, "InfluxdbWriter") << "'" << GetName() << "' started."; + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1)); + + /* Setup timer for periodically flushing m_DataBuffer */ m_FlushTimer = new Timer(); m_FlushTimer->SetInterval(GetFlushInterval()); m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this)); m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + /* Timer for updating and logging work queue stats */ + m_StatsLoggerTimer = new Timer(); + m_StatsLoggerTimer->SetInterval(60); // don't be too noisy + m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this)); + m_StatsLoggerTimer->Start(); + + /* Register for new metrics. */ Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); } @@ -86,9 +113,54 @@ void InfluxdbWriter::Stop(bool runtimeRemoved) Log(LogInformation, "InfluxdbWriter") << "'" << GetName() << "' stopped."; + m_WorkQueue.Join(); + ObjectImpl::Stop(runtimeRemoved); } +void InfluxdbWriter::AssertOnWorkQueue(void) +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!"); + + Log(LogDebug, "InfluxdbWriter") + << "Exception during InfluxDB operation: " << DiagnosticInformation(exp); + + //TODO: Close the connection, if we keep it open. +} + +void InfluxdbWriter::StatsLoggerTimerHandler(void) +{ + int pending = m_WorkQueue.GetLength(); + + double now = Utility::GetTime(); + double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp); + double timeToZero = pending / gradient; + + String timeInfo; + + if (pending > GetTaskCount(5)) { + timeInfo = " empty in "; + if (timeToZero < 0) + timeInfo += "infinite time, your backend isn't able to keep up"; + else + timeInfo += Utility::FormatDuration(timeToZero); + } + + m_PendingTasks = pending; + m_PendingTasksTimestamp = now; + + Log(LogInformation, "InfluxdbWriter") + << "Work queue items: " << pending + << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s" + << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);" + << timeInfo; +} + Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket) { socket = new TcpSocket(); @@ -98,32 +170,32 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket) try { socket->Connect(GetHost(), GetPort()); - } catch (std::exception&) { + } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - return Stream::Ptr(); + throw ex; } if (GetSslEnable()) { - boost::shared_ptr ssl_context; + boost::shared_ptr sslContext; try { - ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (std::exception&) { + sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Unable to create SSL context."; - return Stream::Ptr(); + throw ex; } - TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context); + TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); try { - tls_stream->Handshake(); - } catch (std::exception&) { + tlsStream->Handshake(); + } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "TLS handshake with host '" << GetHost() << "' failed."; - return Stream::Ptr(); + throw ex; } - return tls_stream; + return tlsStream; } else { return new NetworkStream(socket); } @@ -176,7 +248,7 @@ String InfluxdbWriter::FormatInteger(const int val) String InfluxdbWriter::FormatBoolean(const bool val) { - return val ? "true" : "false"; + return String(val); } void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts) @@ -265,6 +337,8 @@ String InfluxdbWriter::EscapeKey(const String& str) String InfluxdbWriter::EscapeField(const String& str) { + //TODO: Evaluate whether boost::regex is really needed here. + // Handle integers boost::regex integer("-?\\d+i"); if (boost::regex_match(str.GetData(), integer)) { @@ -313,29 +387,33 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label msgbuf << " "; - bool first = true; - ObjectLock fieldLock(fields); - for (const Dictionary::Pair& pair : fields) { - if (first) - first = false; - else - msgbuf << ","; - msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second); + { + bool first = true; + + ObjectLock fieldLock(fields); + for (const Dictionary::Pair& pair : fields) { + if (first) + first = false; + else + msgbuf << ","; + + msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second); + } } msgbuf << " " << static_cast(ts); Log(LogDebug, "InfluxdbWriter") - << "Add to metric list:'" << msgbuf.str() << "'."; + << "Add to metric list: '" << msgbuf.str() << "'."; // Atomically buffer the data point - ObjectLock olock(m_DataBuffer); - m_DataBuffer->Add(String(msgbuf.str())); + boost::mutex::scoped_lock lock(m_DataBufferMutex); + m_DataBuffer.push_back(String(msgbuf.str())); // Flush if we've buffered too much to prevent excessive memory use - if (static_cast(m_DataBuffer->GetLength()) >= GetFlushThreshold()) { + if (m_DataBuffer.size() >= GetFlushThreshold()) { Log(LogDebug, "InfluxdbWriter") - << "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points"; + << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; Flush(); } } @@ -344,27 +422,38 @@ void InfluxdbWriter::FlushTimeout(void) { // Prevent new data points from being added to the array, there is a // race condition where they could disappear - ObjectLock olock(m_DataBuffer); + boost::mutex::scoped_lock lock(m_DataBufferMutex); // Flush if there are any data available - if (m_DataBuffer->GetLength() > 0) { + if (m_DataBuffer.size() > 0) { Log(LogDebug, "InfluxdbWriter") - << "Timer expired writing " << m_DataBuffer->GetLength() << " data points"; + << "Timer expired writing " << m_DataBuffer.size() << " data points"; Flush(); } } void InfluxdbWriter::Flush(void) { + // Ensure you hold a lock against m_DataBuffer so that things + // don't go missing after creating the body and clearing the buffer + String body = boost::algorithm::join(m_DataBuffer, "\n"); + m_DataBuffer.clear(); + + // Asynchronously flush the metric body to InfluxDB + m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body)); +} + +void InfluxdbWriter::FlushHandler(const String& body) +{ + AssertOnWorkQueue(); + TcpSocket::Ptr socket; Stream::Ptr stream = Connect(socket); - // Unable to connect, play it safe and lose the data points - // to avoid a memory leak - if (!stream.get()) { - m_DataBuffer->Clear(); + if (!stream) return; - } + + IncreaseTaskCount(); Url::Ptr url = new Url(); url->SetScheme(GetSslEnable() ? "https" : "http"); @@ -382,11 +471,6 @@ void InfluxdbWriter::Flush(void) if (!GetPassword().IsEmpty()) url->AddQueryElement("p", GetPassword()); - // Ensure you hold a lock against m_DataBuffer so that things - // don't go missing after creating the body and clearing the buffer - String body = Utility::Join(m_DataBuffer, '\n', false); - m_DataBuffer->Clear(); - HttpRequest req(stream); req.RequestMethod = "POST"; req.RequestUrl = url; @@ -394,16 +478,18 @@ void InfluxdbWriter::Flush(void) try { req.WriteBody(body.CStr(), body.GetLength()); req.Finish(); - } catch (const std::exception&) { + } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - return; + throw ex; } + //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options. HttpResponse resp(stream, req); StreamReadContext context; struct timeval timeout = { GetSocketTimeout(), 0 }; + if (!socket->Poll(true, false, &timeout)) { Log(LogWarning, "InfluxdbWriter") << "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; @@ -412,10 +498,10 @@ void InfluxdbWriter::Flush(void) try { resp.Parse(context, true); - } catch (const std::exception&) { + } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; - return; + throw ex; } if (resp.StatusCode != 204) { @@ -424,6 +510,20 @@ void InfluxdbWriter::Flush(void) } } +void InfluxdbWriter::IncreaseTaskCount(void) +{ + double now = Utility::GetTime(); + + boost::mutex::scoped_lock lock(m_StatsMutex); + m_TaskStats.InsertValue(now, 1); +} + +int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const +{ + boost::mutex::scoped_lock lock(m_StatsMutex); + return m_TaskStats.GetValues(span); +} + void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) { ObjectImpl::ValidateHostTemplate(value, utils); diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index d7fbe6e18..3b5d18389 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -25,6 +25,9 @@ #include "base/configobject.hpp" #include "base/tcpsocket.hpp" #include "base/timer.hpp" +#include "base/ringbuffer.hpp" +#include "base/workqueue.hpp" +#include #include namespace icinga @@ -41,18 +44,35 @@ public: DECLARE_OBJECT(InfluxdbWriter); DECLARE_OBJECTNAME(InfluxdbWriter); + InfluxdbWriter(void); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + int GetTaskCount(RingBuffer::SizeType span) const; + virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override; virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override; protected: + virtual void OnConfigLoaded(void) override; virtual void Start(bool runtimeCreated) override; virtual void Stop(bool runtimeRemoved) override; + void IncreaseTaskCount(void); + private: + WorkQueue m_WorkQueue; Timer::Ptr m_FlushTimer; - Array::Ptr m_DataBuffer; + std::vector m_DataBuffer; + boost::mutex m_DataBufferMutex; + + mutable boost::mutex m_StatsMutex; + RingBuffer m_TaskStats; + int m_PendingTasks; + double m_PendingTasksTimestamp; + + Timer::Ptr m_StatsLoggerTimer; + void StatsLoggerTimerHandler(void); void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts); @@ -60,6 +80,8 @@ private: void FlushTimeout(void); void Flush(void); + void FlushHandler(const String& body); + static String FormatInteger(const int val); static String FormatBoolean(const bool val); @@ -67,6 +89,10 @@ private: static String EscapeField(const String& str); Stream::Ptr Connect(TcpSocket::Ptr& socket); + + void AssertOnWorkQueue(void); + + void ExceptionHandler(boost::exception_ptr exp); }; }