InfluxDB: Optimize work queue event handling

refs #5219
This commit is contained in:
Michael Friedrich 2017-05-26 17:03:49 +02:00
parent d0dcb8a658
commit dab2522acc
2 changed files with 18 additions and 13 deletions

View File

@ -43,6 +43,7 @@
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
@ -52,7 +53,6 @@ REGISTER_TYPE(InfluxdbWriter);
REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
InfluxdbWriter::InfluxdbWriter(void)
: m_WorkQueue(10000000, 1)
{ }
@ -64,7 +64,7 @@ void InfluxdbWriter::OnConfigLoaded(void)
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
@ -73,13 +73,16 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
//TODO: Collect more stats
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
stats->Set("data_buffer_items", dataBufferItems);
nodes->Set(influxdbwriter->GetName(), stats);
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems));
}
status->Set("influxdbwriter", nodes);
@ -173,6 +176,13 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr));
}
void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
@ -329,6 +339,10 @@ String InfluxdbWriter::EscapeField(const String& str)
if (boost::regex_match(str.GetData(), boolean_false))
return "false";
// Handle NaNs
if (boost::math::isnan(str))
return 0;
// Otherwise it's a string and needs escaping and quoting
String result = str;
boost::algorithm::replace_all(result, "\"", "\\\"");
@ -409,14 +423,6 @@ void InfluxdbWriter::Flush(void)
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
// Asynchronously flush the metric body to InfluxDB
m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
}
void InfluxdbWriter::FlushHandler(const String& body)
{
AssertOnWorkQueue();
TcpSocket::Ptr socket;
Stream::Ptr stream = Connect(socket);

View File

@ -62,13 +62,12 @@ private:
boost::mutex m_DataBufferMutex;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout(void);
void Flush(void);
void FlushHandler(const String& body);
static String FormatInteger(int val);
static String FormatBoolean(bool val);