diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 1aafffced..f34ef6710 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -107,22 +107,13 @@ void InfluxdbCommonWriter::Pause() { /* Force a flush. */ Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing pending data buffers."; + << "Processing pending tasks and flushing data buffers."; - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Joining existing WQ tasks."; + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing data buffers from WQ tasks."; - - Flush(); - Log(LogInformation, GetReflectionType()->GetName()) << "'" << GetName() << "' paused."; @@ -411,6 +402,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic // Buffer the data point m_DataBuffer.emplace_back(msgbuf.str()); + m_DataBufferSize = m_DataBuffer.size(); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { @@ -418,7 +410,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; try { - Flush(); + FlushWQ(); } catch (...) { /* Do nothing. */ } @@ -437,11 +429,13 @@ void InfluxdbCommonWriter::FlushTimeoutWQ() Log(LogDebug, GetReflectionType()->GetName()) << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + FlushWQ(); } -void InfluxdbCommonWriter::Flush() +void InfluxdbCommonWriter::FlushWQ() { + AssertOnWorkQueue(); + namespace beast = boost::beast; namespace http = beast::http; @@ -454,6 +448,7 @@ void InfluxdbCommonWriter::Flush() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); + m_DataBufferSize = 0; OptionalTlsStream stream; diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 06e860841..791a326b4 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -14,6 +14,7 @@ #include "remote/url.hpp" #include #include +#include #include namespace icinga @@ -36,9 +37,6 @@ public: void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; protected: - WorkQueue m_WorkQueue{10000000, 1}; - std::vector m_DataBuffer; - void OnConfigLoaded() override; void Resume() override; void Pause() override; @@ -50,6 +48,9 @@ protected: private: Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; + std::vector m_DataBuffer; + std::atomic_size_t m_DataBufferSize{0}; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -57,7 +58,7 @@ private: const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); void FlushTimeoutWQ(); - void Flush(); + void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); @@ -78,7 +79,7 @@ void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array: for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = influxwriter->m_WorkQueue.GetLength(); double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - size_t dataBufferItems = influxwriter->m_DataBuffer.size(); + size_t dataBufferItems = influxwriter->m_DataBufferSize; nodes.emplace_back(influxwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems },