InfluxdbWriter: Use a work queue for async message processing; add stats log/api

This commit is contained in:
Michael Friedrich 2017-05-04 10:29:49 +02:00
parent f9e0fd2e3e
commit 3649a5a0d7
4 changed files with 184 additions and 48 deletions

View File

@ -886,6 +886,10 @@ Example:
host = "127.0.0.1" host = "127.0.0.1"
port = 8086 port = 8086
database = "icinga2" database = "icinga2"
flush_threshold = 1024
flush_interval = 10s
host_template = { host_template = {
measurement = "$host.check_command$" measurement = "$host.check_command$"
tags = { tags = {
@ -935,6 +939,10 @@ Configuration Attributes:
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
socket_timeout | **Optional.** How long to wait for InfluxDB to respond. Defaults to `5s`. socket_timeout | **Optional.** How long to wait for InfluxDB to respond. Defaults to `5s`.
Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
or similar.
### <a id="objecttype-influxdbwriter-instance-tags"></a> Instance Tagging ### <a id="objecttype-influxdbwriter-instance-tags"></a> Instance Tagging
Consider the following service check: Consider the following service check:

View File

@ -9,6 +9,8 @@ object InfluxdbWriter "influxdb" {
//host = "127.0.0.1" //host = "127.0.0.1"
//port = 8086 //port = 8086
//database = "icinga2" //database = "icinga2"
//flush_threshold = 1024
//flush_interval = 10s
//host_template = { //host_template = {
// measurement = "$host.check_command$" // measurement = "$host.check_command$"
// tags = { // tags = {

View File

@ -25,7 +25,6 @@
#include "icinga/service.hpp" #include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp" #include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp" #include "icinga/icingaapplication.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/perfdatavalue.hpp" #include "icinga/perfdatavalue.hpp"
#include "icinga/checkcommand.hpp" #include "icinga/checkcommand.hpp"
#include "base/tcpsocket.hpp" #include "base/tcpsocket.hpp"
@ -34,7 +33,6 @@
#include "base/logger.hpp" #include "base/logger.hpp"
#include "base/convert.hpp" #include "base/convert.hpp"
#include "base/utility.hpp" #include "base/utility.hpp"
#include "base/application.hpp"
#include "base/stream.hpp" #include "base/stream.hpp"
#include "base/networkstream.hpp" #include "base/networkstream.hpp"
#include "base/exception.hpp" #include "base/exception.hpp"
@ -52,12 +50,32 @@ REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
InfluxdbWriter::InfluxdbWriter(void)
: m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
{ }
void InfluxdbWriter::OnConfigLoaded(void)
{
ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{ {
Dictionary::Ptr nodes = new Dictionary(); Dictionary::Ptr nodes = new Dictionary();
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) { for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
nodes->Set(influxdbwriter->GetName(), 1); //add more stats size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
//TODO: Collect more stats
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("data_buffer_items", dataBufferItems);
nodes->Set(influxdbwriter->GetName(), stats);
} }
status->Set("influxdbwriter", nodes); status->Set("influxdbwriter", nodes);
@ -65,19 +83,28 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
void InfluxdbWriter::Start(bool runtimeCreated) void InfluxdbWriter::Start(bool runtimeCreated)
{ {
m_DataBuffer = new Array();
ObjectImpl<InfluxdbWriter>::Start(runtimeCreated); ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
Log(LogInformation, "InfluxdbWriter") Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' started."; << "'" << GetName() << "' started.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer(); m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval()); m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this)); m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
m_FlushTimer->Start(); m_FlushTimer->Start();
m_FlushTimer->Reschedule(0); m_FlushTimer->Reschedule(0);
/* Timer for updating and logging work queue stats */
m_StatsLoggerTimer = new Timer();
m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
m_StatsLoggerTimer->Start();
/* Register for new metrics. */
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
} }
@ -86,9 +113,54 @@ void InfluxdbWriter::Stop(bool runtimeRemoved)
Log(LogInformation, "InfluxdbWriter") Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' stopped."; << "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved); ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
} }
void InfluxdbWriter::AssertOnWorkQueue(void)
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
Log(LogDebug, "InfluxdbWriter")
<< "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
//TODO: Close the connection, if we keep it open.
}
void InfluxdbWriter::StatsLoggerTimerHandler(void)
{
int pending = m_WorkQueue.GetLength();
double now = Utility::GetTime();
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
double timeToZero = pending / gradient;
String timeInfo;
if (pending > GetTaskCount(5)) {
timeInfo = " empty in ";
if (timeToZero < 0)
timeInfo += "infinite time, your backend isn't able to keep up";
else
timeInfo += Utility::FormatDuration(timeToZero);
}
m_PendingTasks = pending;
m_PendingTasksTimestamp = now;
Log(LogInformation, "InfluxdbWriter")
<< "Work queue items: " << pending
<< ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
<< " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
<< timeInfo;
}
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket) Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
{ {
socket = new TcpSocket(); socket = new TcpSocket();
@ -98,32 +170,32 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
try { try {
socket->Connect(GetHost(), GetPort()); socket->Connect(GetHost(), GetPort());
} catch (std::exception&) { } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
return Stream::Ptr(); throw ex;
} }
if (GetSslEnable()) { if (GetSslEnable()) {
boost::shared_ptr<SSL_CTX> ssl_context; boost::shared_ptr<SSL_CTX> sslContext;
try { try {
ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert()); sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (std::exception&) { } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "Unable to create SSL context."; << "Unable to create SSL context.";
return Stream::Ptr(); throw ex;
} }
TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context); TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
try { try {
tls_stream->Handshake(); tlsStream->Handshake();
} catch (std::exception&) { } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "TLS handshake with host '" << GetHost() << "' failed."; << "TLS handshake with host '" << GetHost() << "' failed.";
return Stream::Ptr(); throw ex;
} }
return tls_stream; return tlsStream;
} else { } else {
return new NetworkStream(socket); return new NetworkStream(socket);
} }
@ -176,7 +248,7 @@ String InfluxdbWriter::FormatInteger(const int val)
String InfluxdbWriter::FormatBoolean(const bool val) String InfluxdbWriter::FormatBoolean(const bool val)
{ {
return val ? "true" : "false"; return String(val);
} }
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts) void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
@ -265,6 +337,8 @@ String InfluxdbWriter::EscapeKey(const String& str)
String InfluxdbWriter::EscapeField(const String& str) String InfluxdbWriter::EscapeField(const String& str)
{ {
//TODO: Evaluate whether boost::regex is really needed here.
// Handle integers // Handle integers
boost::regex integer("-?\\d+i"); boost::regex integer("-?\\d+i");
if (boost::regex_match(str.GetData(), integer)) { if (boost::regex_match(str.GetData(), integer)) {
@ -313,29 +387,33 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label
msgbuf << " "; msgbuf << " ";
bool first = true; {
ObjectLock fieldLock(fields); bool first = true;
for (const Dictionary::Pair& pair : fields) {
if (first) ObjectLock fieldLock(fields);
first = false; for (const Dictionary::Pair& pair : fields) {
else if (first)
msgbuf << ","; first = false;
msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second); else
msgbuf << ",";
msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
}
} }
msgbuf << " " << static_cast<unsigned long>(ts); msgbuf << " " << static_cast<unsigned long>(ts);
Log(LogDebug, "InfluxdbWriter") Log(LogDebug, "InfluxdbWriter")
<< "Add to metric list:'" << msgbuf.str() << "'."; << "Add to metric list: '" << msgbuf.str() << "'.";
// Atomically buffer the data point // Atomically buffer the data point
ObjectLock olock(m_DataBuffer); boost::mutex::scoped_lock lock(m_DataBufferMutex);
m_DataBuffer->Add(String(msgbuf.str())); m_DataBuffer.push_back(String(msgbuf.str()));
// Flush if we've buffered too much to prevent excessive memory use // Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer->GetLength()) >= GetFlushThreshold()) { if (m_DataBuffer.size() >= GetFlushThreshold()) {
Log(LogDebug, "InfluxdbWriter") Log(LogDebug, "InfluxdbWriter")
<< "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points"; << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush(); Flush();
} }
} }
@ -344,27 +422,38 @@ void InfluxdbWriter::FlushTimeout(void)
{ {
// Prevent new data points from being added to the array, there is a // Prevent new data points from being added to the array, there is a
// race condition where they could disappear // race condition where they could disappear
ObjectLock olock(m_DataBuffer); boost::mutex::scoped_lock lock(m_DataBufferMutex);
// Flush if there are any data available // Flush if there are any data available
if (m_DataBuffer->GetLength() > 0) { if (m_DataBuffer.size() > 0) {
Log(LogDebug, "InfluxdbWriter") Log(LogDebug, "InfluxdbWriter")
<< "Timer expired writing " << m_DataBuffer->GetLength() << " data points"; << "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush(); Flush();
} }
} }
void InfluxdbWriter::Flush(void) void InfluxdbWriter::Flush(void)
{ {
// Ensure you hold a lock against m_DataBuffer so that things
// don't go missing after creating the body and clearing the buffer
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
// Asynchronously flush the metric body to InfluxDB
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
}
void InfluxdbWriter::FlushHandler(const String& body)
{
AssertOnWorkQueue();
TcpSocket::Ptr socket; TcpSocket::Ptr socket;
Stream::Ptr stream = Connect(socket); Stream::Ptr stream = Connect(socket);
// Unable to connect, play it safe and lose the data points if (!stream)
// to avoid a memory leak
if (!stream.get()) {
m_DataBuffer->Clear();
return; return;
}
IncreaseTaskCount();
Url::Ptr url = new Url(); Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http"); url->SetScheme(GetSslEnable() ? "https" : "http");
@ -382,11 +471,6 @@ void InfluxdbWriter::Flush(void)
if (!GetPassword().IsEmpty()) if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword()); url->AddQueryElement("p", GetPassword());
// Ensure you hold a lock against m_DataBuffer so that things
// don't go missing after creating the body and clearing the buffer
String body = Utility::Join(m_DataBuffer, '\n', false);
m_DataBuffer->Clear();
HttpRequest req(stream); HttpRequest req(stream);
req.RequestMethod = "POST"; req.RequestMethod = "POST";
req.RequestUrl = url; req.RequestUrl = url;
@ -394,16 +478,18 @@ void InfluxdbWriter::Flush(void)
try { try {
req.WriteBody(body.CStr(), body.GetLength()); req.WriteBody(body.CStr(), body.GetLength());
req.Finish(); req.Finish();
} catch (const std::exception&) { } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
return; throw ex;
} }
//TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
HttpResponse resp(stream, req); HttpResponse resp(stream, req);
StreamReadContext context; StreamReadContext context;
struct timeval timeout = { GetSocketTimeout(), 0 }; struct timeval timeout = { GetSocketTimeout(), 0 };
if (!socket->Poll(true, false, &timeout)) { if (!socket->Poll(true, false, &timeout)) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; << "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
@ -412,10 +498,10 @@ void InfluxdbWriter::Flush(void)
try { try {
resp.Parse(context, true); resp.Parse(context, true);
} catch (const std::exception&) { } catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter") Log(LogWarning, "InfluxdbWriter")
<< "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'."; << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
return; throw ex;
} }
if (resp.StatusCode != 204) { if (resp.StatusCode != 204) {
@ -424,6 +510,20 @@ void InfluxdbWriter::Flush(void)
} }
} }
void InfluxdbWriter::IncreaseTaskCount(void)
{
double now = Utility::GetTime();
boost::mutex::scoped_lock lock(m_StatsMutex);
m_TaskStats.InsertValue(now, 1);
}
int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
{
boost::mutex::scoped_lock lock(m_StatsMutex);
return m_TaskStats.GetValues(span);
}
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
{ {
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils); ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);

View File

@ -25,6 +25,9 @@
#include "base/configobject.hpp" #include "base/configobject.hpp"
#include "base/tcpsocket.hpp" #include "base/tcpsocket.hpp"
#include "base/timer.hpp" #include "base/timer.hpp"
#include "base/ringbuffer.hpp"
#include "base/workqueue.hpp"
#include <boost/thread/mutex.hpp>
#include <fstream> #include <fstream>
namespace icinga namespace icinga
@ -41,18 +44,35 @@ public:
DECLARE_OBJECT(InfluxdbWriter); DECLARE_OBJECT(InfluxdbWriter);
DECLARE_OBJECTNAME(InfluxdbWriter); DECLARE_OBJECTNAME(InfluxdbWriter);
InfluxdbWriter(void);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
int GetTaskCount(RingBuffer::SizeType span) const;
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override; virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override; virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
protected: protected:
virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override; virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override; virtual void Stop(bool runtimeRemoved) override;
void IncreaseTaskCount(void);
private: private:
WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer; Timer::Ptr m_FlushTimer;
Array::Ptr m_DataBuffer; std::vector<String> m_DataBuffer;
boost::mutex m_DataBufferMutex;
mutable boost::mutex m_StatsMutex;
RingBuffer m_TaskStats;
int m_PendingTasks;
double m_PendingTasksTimestamp;
Timer::Ptr m_StatsLoggerTimer;
void StatsLoggerTimerHandler(void);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts); void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
@ -60,6 +80,8 @@ private:
void FlushTimeout(void); void FlushTimeout(void);
void Flush(void); void Flush(void);
void FlushHandler(const String& body);
static String FormatInteger(const int val); static String FormatInteger(const int val);
static String FormatBoolean(const bool val); static String FormatBoolean(const bool val);
@ -67,6 +89,10 @@ private:
static String EscapeField(const String& str); static String EscapeField(const String& str);
Stream::Ptr Connect(TcpSocket::Ptr& socket); Stream::Ptr Connect(TcpSocket::Ptr& socket);
void AssertOnWorkQueue(void);
void ExceptionHandler(boost::exception_ptr exp);
}; };
} }