mirror of https://github.com/Icinga/icinga2.git
Merge pull request #5232 from Icinga/fix/influxdb-lock-5219
InfluxdbWriter: Use a work queue for async message processing; add stats log/api refs #5219
This commit is contained in:
commit
deb7bbd331
|
@ -886,6 +886,10 @@ Example:
|
|||
host = "127.0.0.1"
|
||||
port = 8086
|
||||
database = "icinga2"
|
||||
|
||||
flush_threshold = 1024
|
||||
flush_interval = 10s
|
||||
|
||||
host_template = {
|
||||
measurement = "$host.check_command$"
|
||||
tags = {
|
||||
|
@ -935,6 +939,10 @@ Configuration Attributes:
|
|||
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
|
||||
socket_timeout | **Optional.** How long to wait for InfluxDB to respond. Defaults to `5s`.
|
||||
|
||||
Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
|
||||
to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
|
||||
or similar.
|
||||
|
||||
### <a id="objecttype-influxdbwriter-instance-tags"></a> Instance Tagging
|
||||
|
||||
Consider the following service check:
|
||||
|
|
|
@ -9,6 +9,8 @@ object InfluxdbWriter "influxdb" {
|
|||
//host = "127.0.0.1"
|
||||
//port = 8086
|
||||
//database = "icinga2"
|
||||
//flush_threshold = 1024
|
||||
//flush_interval = 10s
|
||||
//host_template = {
|
||||
// measurement = "$host.check_command$"
|
||||
// tags = {
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#include "icinga/service.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
#include "icinga/perfdatavalue.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
|
@ -34,7 +33,6 @@
|
|||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
|
@ -52,12 +50,32 @@ REGISTER_TYPE(InfluxdbWriter);
|
|||
|
||||
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
|
||||
|
||||
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
|
||||
InfluxdbWriter::InfluxdbWriter(void)
|
||||
: m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
|
||||
{ }
|
||||
|
||||
void InfluxdbWriter::OnConfigLoaded(void)
|
||||
{
|
||||
ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
|
||||
|
||||
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
|
||||
}
|
||||
|
||||
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
|
||||
{
|
||||
Dictionary::Ptr nodes = new Dictionary();
|
||||
|
||||
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
|
||||
nodes->Set(influxdbwriter->GetName(), 1); //add more stats
|
||||
size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
|
||||
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
|
||||
|
||||
//TODO: Collect more stats
|
||||
Dictionary::Ptr stats = new Dictionary();
|
||||
stats->Set("work_queue_items", workQueueItems);
|
||||
stats->Set("data_buffer_items", dataBufferItems);
|
||||
|
||||
nodes->Set(influxdbwriter->GetName(), stats);
|
||||
}
|
||||
|
||||
status->Set("influxdbwriter", nodes);
|
||||
|
@ -65,19 +83,28 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
|
|||
|
||||
void InfluxdbWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
m_DataBuffer = new Array();
|
||||
|
||||
ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
|
||||
|
||||
Log(LogInformation, "InfluxdbWriter")
|
||||
<< "'" << GetName() << "' started.";
|
||||
|
||||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
|
||||
|
||||
/* Setup timer for periodically flushing m_DataBuffer */
|
||||
m_FlushTimer = new Timer();
|
||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||
m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
|
||||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
/* Timer for updating and logging work queue stats */
|
||||
m_StatsLoggerTimer = new Timer();
|
||||
m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
|
||||
m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
|
||||
m_StatsLoggerTimer->Start();
|
||||
|
||||
/* Register for new metrics. */
|
||||
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||
}
|
||||
|
||||
|
@ -86,9 +113,54 @@ void InfluxdbWriter::Stop(bool runtimeRemoved)
|
|||
Log(LogInformation, "InfluxdbWriter")
|
||||
<< "'" << GetName() << "' stopped.";
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
|
||||
}
|
||||
|
||||
void InfluxdbWriter::AssertOnWorkQueue(void)
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
}
|
||||
|
||||
void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
|
||||
{
|
||||
Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
|
||||
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
|
||||
|
||||
//TODO: Close the connection, if we keep it open.
|
||||
}
|
||||
|
||||
void InfluxdbWriter::StatsLoggerTimerHandler(void)
|
||||
{
|
||||
int pending = m_WorkQueue.GetLength();
|
||||
|
||||
double now = Utility::GetTime();
|
||||
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
|
||||
double timeToZero = pending / gradient;
|
||||
|
||||
String timeInfo;
|
||||
|
||||
if (pending > GetTaskCount(5)) {
|
||||
timeInfo = " empty in ";
|
||||
if (timeToZero < 0)
|
||||
timeInfo += "infinite time, your backend isn't able to keep up";
|
||||
else
|
||||
timeInfo += Utility::FormatDuration(timeToZero);
|
||||
}
|
||||
|
||||
m_PendingTasks = pending;
|
||||
m_PendingTasksTimestamp = now;
|
||||
|
||||
Log(LogInformation, "InfluxdbWriter")
|
||||
<< "Work queue items: " << pending
|
||||
<< ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
|
||||
<< " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
|
||||
<< timeInfo;
|
||||
}
|
||||
|
||||
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
|
||||
{
|
||||
socket = new TcpSocket();
|
||||
|
@ -98,32 +170,32 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
|
|||
|
||||
try {
|
||||
socket->Connect(GetHost(), GetPort());
|
||||
} catch (std::exception&) {
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
return Stream::Ptr();
|
||||
throw ex;
|
||||
}
|
||||
|
||||
if (GetSslEnable()) {
|
||||
boost::shared_ptr<SSL_CTX> ssl_context;
|
||||
boost::shared_ptr<SSL_CTX> sslContext;
|
||||
try {
|
||||
ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (std::exception&) {
|
||||
sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
return Stream::Ptr();
|
||||
throw ex;
|
||||
}
|
||||
|
||||
TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context);
|
||||
TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
|
||||
try {
|
||||
tls_stream->Handshake();
|
||||
} catch (std::exception&) {
|
||||
tlsStream->Handshake();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
||||
return Stream::Ptr();
|
||||
throw ex;
|
||||
}
|
||||
|
||||
return tls_stream;
|
||||
return tlsStream;
|
||||
} else {
|
||||
return new NetworkStream(socket);
|
||||
}
|
||||
|
@ -176,7 +248,7 @@ String InfluxdbWriter::FormatInteger(const int val)
|
|||
|
||||
String InfluxdbWriter::FormatBoolean(const bool val)
|
||||
{
|
||||
return val ? "true" : "false";
|
||||
return String(val);
|
||||
}
|
||||
|
||||
void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
|
||||
|
@ -265,6 +337,8 @@ String InfluxdbWriter::EscapeKey(const String& str)
|
|||
|
||||
String InfluxdbWriter::EscapeField(const String& str)
|
||||
{
|
||||
//TODO: Evaluate whether boost::regex is really needed here.
|
||||
|
||||
// Handle integers
|
||||
boost::regex integer("-?\\d+i");
|
||||
if (boost::regex_match(str.GetData(), integer)) {
|
||||
|
@ -313,29 +387,33 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label
|
|||
|
||||
msgbuf << " ";
|
||||
|
||||
{
|
||||
bool first = true;
|
||||
|
||||
ObjectLock fieldLock(fields);
|
||||
for (const Dictionary::Pair& pair : fields) {
|
||||
if (first)
|
||||
first = false;
|
||||
else
|
||||
msgbuf << ",";
|
||||
|
||||
msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
|
||||
}
|
||||
}
|
||||
|
||||
msgbuf << " " << static_cast<unsigned long>(ts);
|
||||
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Add to metric list:'" << msgbuf.str() << "'.";
|
||||
<< "Add to metric list: '" << msgbuf.str() << "'.";
|
||||
|
||||
// Atomically buffer the data point
|
||||
ObjectLock olock(m_DataBuffer);
|
||||
m_DataBuffer->Add(String(msgbuf.str()));
|
||||
boost::mutex::scoped_lock lock(m_DataBufferMutex);
|
||||
m_DataBuffer.push_back(String(msgbuf.str()));
|
||||
|
||||
// Flush if we've buffered too much to prevent excessive memory use
|
||||
if (static_cast<int>(m_DataBuffer->GetLength()) >= GetFlushThreshold()) {
|
||||
if (m_DataBuffer.size() >= GetFlushThreshold()) {
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points";
|
||||
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
|
||||
Flush();
|
||||
}
|
||||
}
|
||||
|
@ -344,27 +422,38 @@ void InfluxdbWriter::FlushTimeout(void)
|
|||
{
|
||||
// Prevent new data points from being added to the array, there is a
|
||||
// race condition where they could disappear
|
||||
ObjectLock olock(m_DataBuffer);
|
||||
boost::mutex::scoped_lock lock(m_DataBufferMutex);
|
||||
|
||||
// Flush if there are any data available
|
||||
if (m_DataBuffer->GetLength() > 0) {
|
||||
if (m_DataBuffer.size() > 0) {
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Timer expired writing " << m_DataBuffer->GetLength() << " data points";
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
Flush();
|
||||
}
|
||||
}
|
||||
|
||||
void InfluxdbWriter::Flush(void)
|
||||
{
|
||||
// Ensure you hold a lock against m_DataBuffer so that things
|
||||
// don't go missing after creating the body and clearing the buffer
|
||||
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
||||
m_DataBuffer.clear();
|
||||
|
||||
// Asynchronously flush the metric body to InfluxDB
|
||||
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
|
||||
}
|
||||
|
||||
void InfluxdbWriter::FlushHandler(const String& body)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
TcpSocket::Ptr socket;
|
||||
Stream::Ptr stream = Connect(socket);
|
||||
|
||||
// Unable to connect, play it safe and lose the data points
|
||||
// to avoid a memory leak
|
||||
if (!stream.get()) {
|
||||
m_DataBuffer->Clear();
|
||||
if (!stream)
|
||||
return;
|
||||
}
|
||||
|
||||
IncreaseTaskCount();
|
||||
|
||||
Url::Ptr url = new Url();
|
||||
url->SetScheme(GetSslEnable() ? "https" : "http");
|
||||
|
@ -382,11 +471,6 @@ void InfluxdbWriter::Flush(void)
|
|||
if (!GetPassword().IsEmpty())
|
||||
url->AddQueryElement("p", GetPassword());
|
||||
|
||||
// Ensure you hold a lock against m_DataBuffer so that things
|
||||
// don't go missing after creating the body and clearing the buffer
|
||||
String body = Utility::Join(m_DataBuffer, '\n', false);
|
||||
m_DataBuffer->Clear();
|
||||
|
||||
HttpRequest req(stream);
|
||||
req.RequestMethod = "POST";
|
||||
req.RequestUrl = url;
|
||||
|
@ -394,16 +478,18 @@ void InfluxdbWriter::Flush(void)
|
|||
try {
|
||||
req.WriteBody(body.CStr(), body.GetLength());
|
||||
req.Finish();
|
||||
} catch (const std::exception&) {
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
return;
|
||||
throw ex;
|
||||
}
|
||||
|
||||
//TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
|
||||
HttpResponse resp(stream, req);
|
||||
StreamReadContext context;
|
||||
|
||||
struct timeval timeout = { GetSocketTimeout(), 0 };
|
||||
|
||||
if (!socket->Poll(true, false, &timeout)) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
@ -412,10 +498,10 @@ void InfluxdbWriter::Flush(void)
|
|||
|
||||
try {
|
||||
resp.Parse(context, true);
|
||||
} catch (const std::exception&) {
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "InfluxdbWriter")
|
||||
<< "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
return;
|
||||
throw ex;
|
||||
}
|
||||
|
||||
if (resp.StatusCode != 204) {
|
||||
|
@ -424,6 +510,20 @@ void InfluxdbWriter::Flush(void)
|
|||
}
|
||||
}
|
||||
|
||||
void InfluxdbWriter::IncreaseTaskCount(void)
|
||||
{
|
||||
double now = Utility::GetTime();
|
||||
|
||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||
m_TaskStats.InsertValue(now, 1);
|
||||
}
|
||||
|
||||
int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||
return m_TaskStats.GetValues(span);
|
||||
}
|
||||
|
||||
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
|
||||
{
|
||||
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
|
||||
|
|
|
@ -25,6 +25,9 @@
|
|||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/ringbuffer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
|
@ -41,18 +44,35 @@ public:
|
|||
DECLARE_OBJECT(InfluxdbWriter);
|
||||
DECLARE_OBJECTNAME(InfluxdbWriter);
|
||||
|
||||
InfluxdbWriter(void);
|
||||
|
||||
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||
|
||||
int GetTaskCount(RingBuffer::SizeType span) const;
|
||||
|
||||
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||
|
||||
protected:
|
||||
virtual void OnConfigLoaded(void) override;
|
||||
virtual void Start(bool runtimeCreated) override;
|
||||
virtual void Stop(bool runtimeRemoved) override;
|
||||
|
||||
void IncreaseTaskCount(void);
|
||||
|
||||
private:
|
||||
WorkQueue m_WorkQueue;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
Array::Ptr m_DataBuffer;
|
||||
std::vector<String> m_DataBuffer;
|
||||
boost::mutex m_DataBufferMutex;
|
||||
|
||||
mutable boost::mutex m_StatsMutex;
|
||||
RingBuffer m_TaskStats;
|
||||
int m_PendingTasks;
|
||||
double m_PendingTasksTimestamp;
|
||||
|
||||
Timer::Ptr m_StatsLoggerTimer;
|
||||
void StatsLoggerTimerHandler(void);
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
|
||||
|
@ -60,6 +80,8 @@ private:
|
|||
void FlushTimeout(void);
|
||||
void Flush(void);
|
||||
|
||||
void FlushHandler(const String& body);
|
||||
|
||||
static String FormatInteger(const int val);
|
||||
static String FormatBoolean(const bool val);
|
||||
|
||||
|
@ -67,6 +89,10 @@ private:
|
|||
static String EscapeField(const String& str);
|
||||
|
||||
Stream::Ptr Connect(TcpSocket::Ptr& socket);
|
||||
|
||||
void AssertOnWorkQueue(void);
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue