Merge pull request #5287 from Icinga/feature/workqueues-features

Use workqueues in Graphite and InfluxDB features

refs #5133
refs #5280
This commit is contained in:
Michael Friedrich 2017-05-26 17:17:00 +02:00 committed by GitHub
commit 803bbc4a72
8 changed files with 136 additions and 130 deletions

View File

@ -38,8 +38,7 @@ Timer::Ptr DbConnection::m_ProgramStatusTimer;
boost::once_flag DbConnection::m_OnceFlag = BOOST_ONCE_INIT;
DbConnection::DbConnection(void)
: m_IDCacheValid(false), m_QueryStats(15 * 60), m_PendingQueries(0),
m_PendingQueriesTimestamp(0), m_ActiveChangedHandler(false)
: m_IDCacheValid(false), m_QueryStats(15 * 60), m_ActiveChangedHandler(false)
{ }
void DbConnection::OnConfigLoaded(void)
@ -87,37 +86,6 @@ void DbConnection::EnableActiveChangedHandler(void)
}
}
void DbConnection::StatsLoggerTimerHandler(void)
{
if (!GetConnected())
return;
int pending = GetPendingQueryCount();
double now = Utility::GetTime();
double gradient = (pending - m_PendingQueries) / (now - m_PendingQueriesTimestamp);
double timeToZero = -pending / gradient;
String timeInfo;
if (pending > GetQueryCount(5)) {
timeInfo = " empty in ";
if (timeToZero < 0)
timeInfo += "infinite time, your database isn't able to keep up";
else
timeInfo += Utility::FormatDuration(timeToZero);
}
m_PendingQueries = pending;
m_PendingQueriesTimestamp = now;
Log(LogInformation, GetReflectionType()->GetName())
<< "Query queue items: " << pending
<< ", query rate: " << std::setw(2) << GetQueryCount(60) / 60.0 << "/s"
<< " (" << GetQueryCount(60) << "/min " << GetQueryCount(5 * 60) << "/5min " << GetQueryCount(15 * 60) << "/15min);"
<< timeInfo;
}
void DbConnection::Resume(void)
{
ConfigObject::Resume();
@ -129,11 +97,6 @@ void DbConnection::Resume(void)
m_CleanUpTimer->SetInterval(60);
m_CleanUpTimer->OnTimerExpired.connect(boost::bind(&DbConnection::CleanUpHandler, this));
m_CleanUpTimer->Start();
m_StatsLoggerTimer = new Timer();
m_StatsLoggerTimer->SetInterval(15);
m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&DbConnection::StatsLoggerTimerHandler, this));
m_StatsLoggerTimer->Start();
}
void DbConnection::Pause(void)

View File

@ -126,15 +126,10 @@ private:
static Timer::Ptr m_ProgramStatusTimer;
static boost::once_flag m_OnceFlag;
Timer::Ptr m_StatsLoggerTimer;
void StatsLoggerTimerHandler(void);
static void InsertRuntimeVariable(const String& key, const Value& value);
mutable boost::mutex m_StatsMutex;
RingBuffer m_QueryStats;
int m_PendingQueries;
double m_PendingQueriesTimestamp;
bool m_ActiveChangedHandler;
};

View File

@ -22,7 +22,6 @@
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/compatutility.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
@ -46,12 +45,33 @@ REGISTER_TYPE(GraphiteWriter);
REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
GraphiteWriter::GraphiteWriter(void)
: m_WorkQueue(10000000, 1)
{ }
void GraphiteWriter::OnConfigLoaded(void)
{
ObjectImpl<GraphiteWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GraphiteWriter, " + GetName());
}
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
nodes->Set(graphitewriter->GetName(), 1); //add more stats
size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
nodes->Set(graphitewriter->GetName(), stats);
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("graphitewriter", nodes);
@ -64,6 +84,10 @@ void GraphiteWriter::Start(bool runtimeCreated)
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' started.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(boost::bind(&GraphiteWriter::ExceptionHandler, this, _1));
/* Timer for reconnecting */
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GraphiteWriter::ReconnectTimerHandler, this));
@ -78,12 +102,43 @@ void GraphiteWriter::Stop(bool runtimeRemoved)
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
ObjectImpl<GraphiteWriter>::Stop(runtimeRemoved);
}
void GraphiteWriter::ReconnectTimerHandler(void)
void GraphiteWriter::AssertOnWorkQueue(void)
{
if (m_Stream)
ASSERT(m_WorkQueue.IsWorkerThread());
}
void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
Log(LogDebug, "GraphiteWriter")
<< "Exception during Graphite operation: " << DiagnosticInformation(exp);
if (GetConnected()) {
m_Stream->Close();
SetConnected(false);
}
}
void GraphiteWriter::Reconnect(void)
{
AssertOnWorkQueue();
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
SetShouldConnect(true);
bool reconnect = false;
if (GetConnected())
return;
TcpSocket::Ptr socket = new TcpSocket();
@ -93,17 +148,46 @@ void GraphiteWriter::ReconnectTimerHandler(void)
try {
socket->Connect(GetHost(), GetPort());
} catch (std::exception&) {
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
return;
throw ex;
}
m_Stream = new NetworkStream(socket);
SetConnected(true);
Log(LogInformation, "GraphiteWriter")
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
void GraphiteWriter::ReconnectTimerHandler(void)
{
m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
}
void GraphiteWriter::Disconnect(void)
{
AssertOnWorkQueue();
if (!GetConnected())
return;
m_Stream->Close();
SetConnected(false);
}
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::InternalCheckResultHandler, this, checkable, cr));
}
void GraphiteWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())

View File

@ -25,6 +25,7 @@
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <fstream>
namespace icinga
@ -41,21 +42,26 @@ public:
DECLARE_OBJECT(GraphiteWriter);
DECLARE_OBJECTNAME(GraphiteWriter);
GraphiteWriter(void);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
virtual void ValidateHostNameTemplate(const String& value, const ValidationUtils& utils) override;
virtual void ValidateServiceNameTemplate(const String& value, const ValidationUtils& utils) override;
protected:
virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
Stream::Ptr m_Stream;
WorkQueue m_WorkQueue;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const String& prefix, const String& name, double value, double ts);
void SendPerfdata(const String& prefix, const CheckResult::Ptr& cr, double ts);
static String EscapeMetric(const String& str, bool legacyMode = false);
@ -63,6 +69,13 @@ private:
static Value EscapeMacroMetric(const Value& value, bool legacyMode = false);
void ReconnectTimerHandler(void);
void Disconnect(void);
void Reconnect(void);
void AssertOnWorkQueue(void);
void ExceptionHandler(boost::exception_ptr exp);
};
}

View File

@ -42,6 +42,10 @@ class GraphiteWriter : ConfigObject
[config] bool enable_send_metadata;
[config] bool enable_legacy_mode;
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
};
}

View File

@ -43,6 +43,7 @@
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
@ -52,9 +53,8 @@ REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
InfluxdbWriter::InfluxdbWriter(void)
: m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
: m_WorkQueue(10000000, 1)
{ }
void InfluxdbWriter::OnConfigLoaded(void)
@ -64,7 +64,7 @@ void InfluxdbWriter::OnConfigLoaded(void)
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
@ -73,13 +73,16 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
//TODO: Collect more stats
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
stats->Set("data_buffer_items", dataBufferItems);
nodes->Set(influxdbwriter->GetName(), stats);
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems));
}
status->Set("influxdbwriter", nodes);
@ -102,12 +105,6 @@ void InfluxdbWriter::Start(bool runtimeCreated)
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Timer for updating and logging work queue stats */
m_StatsLoggerTimer = new Timer();
m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
m_StatsLoggerTimer->Start();
/* Register for new metrics. */
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}
@ -137,34 +134,6 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
//TODO: Close the connection, if we keep it open.
}
void InfluxdbWriter::StatsLoggerTimerHandler(void)
{
int pending = m_WorkQueue.GetLength();
double now = Utility::GetTime();
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
double timeToZero = pending / gradient;
String timeInfo;
if (pending > GetTaskCount(5)) {
timeInfo = " empty in ";
if (timeToZero < 0)
timeInfo += "infinite time, your backend isn't able to keep up";
else
timeInfo += Utility::FormatDuration(timeToZero);
}
m_PendingTasks = pending;
m_PendingTasksTimestamp = now;
Log(LogInformation, "InfluxdbWriter")
<< "Work queue items: " << pending
<< ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
<< " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
<< timeInfo;
}
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
{
socket = new TcpSocket();
@ -207,6 +176,13 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr));
}
void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
@ -363,6 +339,10 @@ String InfluxdbWriter::EscapeField(const String& str)
if (boost::regex_match(str.GetData(), boolean_false))
return "false";
// Handle NaNs
if (boost::math::isnan(str))
return 0;
// Otherwise it's a string and needs escaping and quoting
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
@ -443,22 +423,12 @@ void InfluxdbWriter::Flush(void)
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
// Asynchronously flush the metric body to InfluxDB
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
}
void InfluxdbWriter::FlushHandler(const String& body)
{
AssertOnWorkQueue();
TcpSocket::Ptr socket;
Stream::Ptr stream = Connect(socket);
if (!stream)
return;
IncreaseTaskCount();
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
url->SetHost(GetHost());
@ -544,20 +514,6 @@ void InfluxdbWriter::FlushHandler(const String& body)
}
}
void InfluxdbWriter::IncreaseTaskCount(void)
{
double now = Utility::GetTime();
boost::mutex::scoped_lock lock(m_StatsMutex);
m_TaskStats.InsertValue(now, 1);
}
int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
{
boost::mutex::scoped_lock lock(m_StatsMutex);
return m_TaskStats.GetValues(span);
}
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);

View File

@ -25,7 +25,6 @@
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/ringbuffer.hpp"
#include "base/workqueue.hpp"
#include <boost/thread/mutex.hpp>
#include <fstream>
@ -48,8 +47,6 @@ public:
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
int GetTaskCount(RingBuffer::SizeType span) const;
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
@ -58,30 +55,19 @@ protected:
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
void IncreaseTaskCount(void);
private:
WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
boost::mutex m_DataBufferMutex;
mutable boost::mutex m_StatsMutex;
RingBuffer m_TaskStats;
int m_PendingTasks;
double m_PendingTasksTimestamp;
Timer::Ptr m_StatsLoggerTimer;
void StatsLoggerTimerHandler(void);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout(void);
void Flush(void);
void FlushHandler(const String& body);
static String FormatInteger(int val);
static String FormatBoolean(bool val);

View File

@ -349,7 +349,7 @@ int JsonRpcConnection::GetWorkQueueLength(void)
{
size_t itemCount = 0;
for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
for (size_t i = 0; i < GetWorkQueueCount(); i++) {
itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength();
}
@ -359,11 +359,16 @@ int JsonRpcConnection::GetWorkQueueLength(void)
double JsonRpcConnection::GetWorkQueueRate(void)
{
double rate = 0.0;
int count = GetWorkQueueCount();
for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
/* If this is a standalone environment, we don't have any queues. */
if (count == 0)
return 0.0;
for (size_t i = 0; i < count; i++) {
rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0;
}
return rate / l_JsonRpcConnectionWorkQueueCount;
return rate / count;
}