diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index bcd5ba0e4..55eea910c 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -89,7 +89,7 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength(); double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - size_t dataBufferItems = influxdbwriter->m_DataBuffer.size(); + size_t dataBufferItems = influxdbwriter->m_DataBufferSize; nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, @@ -131,22 +131,13 @@ void InfluxdbWriter::Pause() { /* Force a flush. */ Log(LogDebug, "InfluxdbWriter") - << "Flushing pending data buffers."; + << "Processing pending tasks and flushing data buffers."; - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, "InfluxdbWriter") - << "Joining existing WQ tasks."; + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, "InfluxdbWriter") - << "Flushing data buffers from WQ tasks."; - - Flush(); - Log(LogInformation, "InfluxdbWriter") << "'" << GetName() << "' paused."; @@ -435,6 +426,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar // Buffer the data point m_DataBuffer.emplace_back(msgbuf.str()); + m_DataBufferSize = m_DataBuffer.size(); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { @@ -442,7 +434,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; try { - Flush(); + FlushWQ(); } catch (...) { /* Do nothing. */ } @@ -461,11 +453,13 @@ void InfluxdbWriter::FlushTimeoutWQ() Log(LogDebug, "InfluxdbWriter") << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + FlushWQ(); } -void InfluxdbWriter::Flush() +void InfluxdbWriter::FlushWQ() { + AssertOnWorkQueue(); + namespace beast = boost::beast; namespace http = beast::http; @@ -478,6 +472,7 @@ void InfluxdbWriter::Flush() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); + m_DataBufferSize = 0; OptionalTlsStream stream; diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index 1f7ab8309..26d148364 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -10,6 +10,7 @@ #include "base/timer.hpp" #include "base/tlsstream.hpp" #include "base/workqueue.hpp" +#include #include namespace icinga @@ -37,9 +38,10 @@ protected: void Pause() override; private: - WorkQueue m_WorkQueue{10000000, 1}; Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; + std::atomic_size_t m_DataBufferSize{0}; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -47,7 +49,7 @@ private: const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); void FlushTimeoutWQ(); - void Flush(); + void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value);