mirror of https://github.com/Icinga/icinga2.git
InfluxDB: Remove obsolete logger, now implemented in WorkQueue class
refs #5280 refs #5133
This commit is contained in:
parent
3a5d4f3c8d
commit
647d82094f
|
@ -54,7 +54,7 @@ REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
|
||||||
|
|
||||||
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
|
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
|
||||||
InfluxdbWriter::InfluxdbWriter(void)
|
InfluxdbWriter::InfluxdbWriter(void)
|
||||||
: m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
|
: m_WorkQueue(10000000, 1)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
void InfluxdbWriter::OnConfigLoaded(void)
|
void InfluxdbWriter::OnConfigLoaded(void)
|
||||||
|
@ -102,12 +102,6 @@ void InfluxdbWriter::Start(bool runtimeCreated)
|
||||||
m_FlushTimer->Start();
|
m_FlushTimer->Start();
|
||||||
m_FlushTimer->Reschedule(0);
|
m_FlushTimer->Reschedule(0);
|
||||||
|
|
||||||
/* Timer for updating and logging work queue stats */
|
|
||||||
m_StatsLoggerTimer = new Timer();
|
|
||||||
m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
|
|
||||||
m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
|
|
||||||
m_StatsLoggerTimer->Start();
|
|
||||||
|
|
||||||
/* Register for new metrics. */
|
/* Register for new metrics. */
|
||||||
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||||
}
|
}
|
||||||
|
@ -137,34 +131,6 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
|
||||||
//TODO: Close the connection, if we keep it open.
|
//TODO: Close the connection, if we keep it open.
|
||||||
}
|
}
|
||||||
|
|
||||||
void InfluxdbWriter::StatsLoggerTimerHandler(void)
|
|
||||||
{
|
|
||||||
int pending = m_WorkQueue.GetLength();
|
|
||||||
|
|
||||||
double now = Utility::GetTime();
|
|
||||||
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
|
|
||||||
double timeToZero = pending / gradient;
|
|
||||||
|
|
||||||
String timeInfo;
|
|
||||||
|
|
||||||
if (pending > GetTaskCount(5)) {
|
|
||||||
timeInfo = " empty in ";
|
|
||||||
if (timeToZero < 0)
|
|
||||||
timeInfo += "infinite time, your backend isn't able to keep up";
|
|
||||||
else
|
|
||||||
timeInfo += Utility::FormatDuration(timeToZero);
|
|
||||||
}
|
|
||||||
|
|
||||||
m_PendingTasks = pending;
|
|
||||||
m_PendingTasksTimestamp = now;
|
|
||||||
|
|
||||||
Log(LogInformation, "InfluxdbWriter")
|
|
||||||
<< "Work queue items: " << pending
|
|
||||||
<< ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
|
|
||||||
<< " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
|
|
||||||
<< timeInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
|
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
|
||||||
{
|
{
|
||||||
socket = new TcpSocket();
|
socket = new TcpSocket();
|
||||||
|
@ -457,8 +423,6 @@ void InfluxdbWriter::FlushHandler(const String& body)
|
||||||
if (!stream)
|
if (!stream)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
IncreaseTaskCount();
|
|
||||||
|
|
||||||
Url::Ptr url = new Url();
|
Url::Ptr url = new Url();
|
||||||
url->SetScheme(GetSslEnable() ? "https" : "http");
|
url->SetScheme(GetSslEnable() ? "https" : "http");
|
||||||
url->SetHost(GetHost());
|
url->SetHost(GetHost());
|
||||||
|
@ -544,20 +508,6 @@ void InfluxdbWriter::FlushHandler(const String& body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void InfluxdbWriter::IncreaseTaskCount(void)
|
|
||||||
{
|
|
||||||
double now = Utility::GetTime();
|
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
|
||||||
m_TaskStats.InsertValue(now, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
|
|
||||||
{
|
|
||||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
|
||||||
return m_TaskStats.GetValues(span);
|
|
||||||
}
|
|
||||||
|
|
||||||
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
|
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
|
||||||
{
|
{
|
||||||
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
|
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
|
||||||
|
|
|
@ -25,7 +25,6 @@
|
||||||
#include "base/configobject.hpp"
|
#include "base/configobject.hpp"
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
#include "base/ringbuffer.hpp"
|
|
||||||
#include "base/workqueue.hpp"
|
#include "base/workqueue.hpp"
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
@ -48,8 +47,6 @@ public:
|
||||||
|
|
||||||
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||||
|
|
||||||
int GetTaskCount(RingBuffer::SizeType span) const;
|
|
||||||
|
|
||||||
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||||
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
|
||||||
|
|
||||||
|
@ -58,22 +55,12 @@ protected:
|
||||||
virtual void Start(bool runtimeCreated) override;
|
virtual void Start(bool runtimeCreated) override;
|
||||||
virtual void Stop(bool runtimeRemoved) override;
|
virtual void Stop(bool runtimeRemoved) override;
|
||||||
|
|
||||||
void IncreaseTaskCount(void);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
WorkQueue m_WorkQueue;
|
WorkQueue m_WorkQueue;
|
||||||
Timer::Ptr m_FlushTimer;
|
Timer::Ptr m_FlushTimer;
|
||||||
std::vector<String> m_DataBuffer;
|
std::vector<String> m_DataBuffer;
|
||||||
boost::mutex m_DataBufferMutex;
|
boost::mutex m_DataBufferMutex;
|
||||||
|
|
||||||
mutable boost::mutex m_StatsMutex;
|
|
||||||
RingBuffer m_TaskStats;
|
|
||||||
int m_PendingTasks;
|
|
||||||
double m_PendingTasksTimestamp;
|
|
||||||
|
|
||||||
Timer::Ptr m_StatsLoggerTimer;
|
|
||||||
void StatsLoggerTimerHandler(void);
|
|
||||||
|
|
||||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||||
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
|
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
|
||||||
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
|
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);
|
||||||
|
|
Loading…
Reference in New Issue