diff --git a/doc/9-object-types.md b/doc/9-object-types.md
index f16d09fab..73ab4fdd4 100644
--- a/doc/9-object-types.md
+++ b/doc/9-object-types.md
@@ -886,6 +886,10 @@ Example:
host = "127.0.0.1"
port = 8086
database = "icinga2"
+
+ flush_threshold = 1024
+ flush_interval = 10s
+
host_template = {
measurement = "$host.check_command$"
tags = {
@@ -935,6 +939,10 @@ Configuration Attributes:
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
socket_timeout | **Optional.** How long to wait for InfluxDB to respond. Defaults to `5s`.
+Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
+to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
+or similar.
+
### Instance Tagging
Consider the following service check:
diff --git a/etc/icinga2/features-available/influxdb.conf b/etc/icinga2/features-available/influxdb.conf
index 058568bf1..20f9ed253 100644
--- a/etc/icinga2/features-available/influxdb.conf
+++ b/etc/icinga2/features-available/influxdb.conf
@@ -9,6 +9,8 @@ object InfluxdbWriter "influxdb" {
//host = "127.0.0.1"
//port = 8086
//database = "icinga2"
+ //flush_threshold = 1024
+ //flush_interval = 10s
//host_template = {
// measurement = "$host.check_command$"
// tags = {
diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp
index 198d0658f..a8e63d235 100644
--- a/lib/perfdata/influxdbwriter.cpp
+++ b/lib/perfdata/influxdbwriter.cpp
@@ -25,7 +25,6 @@
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
-#include "icinga/compatutility.hpp"
#include "icinga/perfdatavalue.hpp"
#include "icinga/checkcommand.hpp"
#include "base/tcpsocket.hpp"
@@ -34,7 +33,6 @@
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
-#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
@@ -52,12 +50,32 @@ REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
+//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
+InfluxdbWriter::InfluxdbWriter(void)
+ : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
+{ }
+
+void InfluxdbWriter::OnConfigLoaded(void)
+{
+ ObjectImpl::OnConfigLoaded();
+
+ m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
+}
+
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
Dictionary::Ptr nodes = new Dictionary();
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) {
- nodes->Set(influxdbwriter->GetName(), 1); //add more stats
+ size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
+ size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
+
+ //TODO: Collect more stats
+ Dictionary::Ptr stats = new Dictionary();
+ stats->Set("work_queue_items", workQueueItems);
+ stats->Set("data_buffer_items", dataBufferItems);
+
+ nodes->Set(influxdbwriter->GetName(), stats);
}
status->Set("influxdbwriter", nodes);
@@ -65,19 +83,28 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
void InfluxdbWriter::Start(bool runtimeCreated)
{
- m_DataBuffer = new Array();
-
ObjectImpl::Start(runtimeCreated);
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' started.";
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
+
+ /* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
+ /* Timer for updating and logging work queue stats */
+ m_StatsLoggerTimer = new Timer();
+ m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
+ m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
+ m_StatsLoggerTimer->Start();
+
+ /* Register for new metrics. */
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}
@@ -86,9 +113,54 @@ void InfluxdbWriter::Stop(bool runtimeRemoved)
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' stopped.";
+ m_WorkQueue.Join();
+
ObjectImpl::Stop(runtimeRemoved);
}
+void InfluxdbWriter::AssertOnWorkQueue(void)
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "InfluxdbWriter")
+ << "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
+
+ //TODO: Close the connection, if we keep it open.
+}
+
+void InfluxdbWriter::StatsLoggerTimerHandler(void)
+{
+ int pending = m_WorkQueue.GetLength();
+
+ double now = Utility::GetTime();
+ double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
+ double timeToZero = pending / gradient;
+
+ String timeInfo;
+
+ if (pending > GetTaskCount(5)) {
+ timeInfo = " empty in ";
+ if (timeToZero < 0)
+ timeInfo += "infinite time, your backend isn't able to keep up";
+ else
+ timeInfo += Utility::FormatDuration(timeToZero);
+ }
+
+ m_PendingTasks = pending;
+ m_PendingTasksTimestamp = now;
+
+ Log(LogInformation, "InfluxdbWriter")
+ << "Work queue items: " << pending
+ << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
+ << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
+ << timeInfo;
+}
+
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
{
socket = new TcpSocket();
@@ -98,32 +170,32 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
try {
socket->Connect(GetHost(), GetPort());
- } catch (std::exception&) {
+ } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
- return Stream::Ptr();
+ throw ex;
}
if (GetSslEnable()) {
- boost::shared_ptr ssl_context;
+ boost::shared_ptr sslContext;
try {
- ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
- } catch (std::exception&) {
+ sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+ } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to create SSL context.";
- return Stream::Ptr();
+ throw ex;
}
- TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context);
+ TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
try {
- tls_stream->Handshake();
- } catch (std::exception&) {
+ tlsStream->Handshake();
+ } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "TLS handshake with host '" << GetHost() << "' failed.";
- return Stream::Ptr();
+ throw ex;
}
- return tls_stream;
+ return tlsStream;
} else {
return new NetworkStream(socket);
}
@@ -176,7 +248,7 @@ String InfluxdbWriter::FormatInteger(const int val)
String InfluxdbWriter::FormatBoolean(const bool val)
{
- return val ? "true" : "false";
+ return String(val);
}
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
@@ -265,6 +337,8 @@ String InfluxdbWriter::EscapeKey(const String& str)
String InfluxdbWriter::EscapeField(const String& str)
{
+ //TODO: Evaluate whether boost::regex is really needed here.
+
// Handle integers
boost::regex integer("-?\\d+i");
if (boost::regex_match(str.GetData(), integer)) {
@@ -313,29 +387,33 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label
msgbuf << " ";
- bool first = true;
- ObjectLock fieldLock(fields);
- for (const Dictionary::Pair& pair : fields) {
- if (first)
- first = false;
- else
- msgbuf << ",";
- msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
+ {
+ bool first = true;
+
+ ObjectLock fieldLock(fields);
+ for (const Dictionary::Pair& pair : fields) {
+ if (first)
+ first = false;
+ else
+ msgbuf << ",";
+
+ msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
+ }
}
msgbuf << " " << static_cast(ts);
Log(LogDebug, "InfluxdbWriter")
- << "Add to metric list:'" << msgbuf.str() << "'.";
+ << "Add to metric list: '" << msgbuf.str() << "'.";
// Atomically buffer the data point
- ObjectLock olock(m_DataBuffer);
- m_DataBuffer->Add(String(msgbuf.str()));
+ boost::mutex::scoped_lock lock(m_DataBufferMutex);
+ m_DataBuffer.push_back(String(msgbuf.str()));
// Flush if we've buffered too much to prevent excessive memory use
- if (static_cast(m_DataBuffer->GetLength()) >= GetFlushThreshold()) {
+ if (m_DataBuffer.size() >= GetFlushThreshold()) {
Log(LogDebug, "InfluxdbWriter")
- << "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points";
+ << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
@@ -344,27 +422,38 @@ void InfluxdbWriter::FlushTimeout(void)
{
// Prevent new data points from being added to the array, there is a
// race condition where they could disappear
- ObjectLock olock(m_DataBuffer);
+ boost::mutex::scoped_lock lock(m_DataBufferMutex);
// Flush if there are any data available
- if (m_DataBuffer->GetLength() > 0) {
+ if (m_DataBuffer.size() > 0) {
Log(LogDebug, "InfluxdbWriter")
- << "Timer expired writing " << m_DataBuffer->GetLength() << " data points";
+ << "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void InfluxdbWriter::Flush(void)
{
+ // Ensure you hold a lock against m_DataBuffer so that things
+ // don't go missing after creating the body and clearing the buffer
+ String body = boost::algorithm::join(m_DataBuffer, "\n");
+ m_DataBuffer.clear();
+
+ // Asynchronously flush the metric body to InfluxDB
+ m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
+}
+
+void InfluxdbWriter::FlushHandler(const String& body)
+{
+ AssertOnWorkQueue();
+
TcpSocket::Ptr socket;
Stream::Ptr stream = Connect(socket);
- // Unable to connect, play it safe and lose the data points
- // to avoid a memory leak
- if (!stream.get()) {
- m_DataBuffer->Clear();
+ if (!stream)
return;
- }
+
+ IncreaseTaskCount();
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
@@ -382,11 +471,6 @@ void InfluxdbWriter::Flush(void)
if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword());
- // Ensure you hold a lock against m_DataBuffer so that things
- // don't go missing after creating the body and clearing the buffer
- String body = Utility::Join(m_DataBuffer, '\n', false);
- m_DataBuffer->Clear();
-
HttpRequest req(stream);
req.RequestMethod = "POST";
req.RequestUrl = url;
@@ -394,16 +478,18 @@ void InfluxdbWriter::Flush(void)
try {
req.WriteBody(body.CStr(), body.GetLength());
req.Finish();
- } catch (const std::exception&) {
+ } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
- return;
+ throw ex;
}
+ //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
HttpResponse resp(stream, req);
StreamReadContext context;
struct timeval timeout = { GetSocketTimeout(), 0 };
+
if (!socket->Poll(true, false, &timeout)) {
Log(LogWarning, "InfluxdbWriter")
<< "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
@@ -412,10 +498,10 @@ void InfluxdbWriter::Flush(void)
try {
resp.Parse(context, true);
- } catch (const std::exception&) {
+ } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
- return;
+ throw ex;
}
if (resp.StatusCode != 204) {
@@ -424,6 +510,20 @@ void InfluxdbWriter::Flush(void)
}
}
+void InfluxdbWriter::IncreaseTaskCount(void)
+{
+ double now = Utility::GetTime();
+
+ boost::mutex::scoped_lock lock(m_StatsMutex);
+ m_TaskStats.InsertValue(now, 1);
+}
+
+int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
+{
+ boost::mutex::scoped_lock lock(m_StatsMutex);
+ return m_TaskStats.GetValues(span);
+}
+
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
{
ObjectImpl::ValidateHostTemplate(value, utils);
diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp
index d7fbe6e18..3b5d18389 100644
--- a/lib/perfdata/influxdbwriter.hpp
+++ b/lib/perfdata/influxdbwriter.hpp
@@ -25,6 +25,9 @@
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
+#include "base/ringbuffer.hpp"
+#include "base/workqueue.hpp"
+#include
#include
namespace icinga
@@ -41,18 +44,35 @@ public:
DECLARE_OBJECT(InfluxdbWriter);
DECLARE_OBJECTNAME(InfluxdbWriter);
+ InfluxdbWriter(void);
+
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+ int GetTaskCount(RingBuffer::SizeType span) const;
+
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
protected:
+ virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
+ void IncreaseTaskCount(void);
+
private:
+ WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer;
- Array::Ptr m_DataBuffer;
+ std::vector m_DataBuffer;
+ boost::mutex m_DataBufferMutex;
+
+ mutable boost::mutex m_StatsMutex;
+ RingBuffer m_TaskStats;
+ int m_PendingTasks;
+ double m_PendingTasksTimestamp;
+
+ Timer::Ptr m_StatsLoggerTimer;
+ void StatsLoggerTimerHandler(void);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
@@ -60,6 +80,8 @@ private:
void FlushTimeout(void);
void Flush(void);
+ void FlushHandler(const String& body);
+
static String FormatInteger(const int val);
static String FormatBoolean(const bool val);
@@ -67,6 +89,10 @@ private:
static String EscapeField(const String& str);
Stream::Ptr Connect(TcpSocket::Ptr& socket);
+
+ void AssertOnWorkQueue(void);
+
+ void ExceptionHandler(boost::exception_ptr exp);
};
}