diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 1aafffced..9ce914a63 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -107,22 +107,13 @@ void InfluxdbCommonWriter::Pause() { /* Force a flush. */ Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing pending data buffers."; + << "Processing pending tasks and flushing data buffers."; - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Joining existing WQ tasks."; + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing data buffers from WQ tasks."; - - Flush(); - Log(LogInformation, GetReflectionType()->GetName()) << "'" << GetName() << "' paused."; @@ -418,7 +409,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; try { - Flush(); + FlushWQ(); } catch (...) { /* Do nothing. */ } @@ -437,11 +428,13 @@ void InfluxdbCommonWriter::FlushTimeoutWQ() Log(LogDebug, GetReflectionType()->GetName()) << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + FlushWQ(); } -void InfluxdbCommonWriter::Flush() +void InfluxdbCommonWriter::FlushWQ() { + AssertOnWorkQueue(); + namespace beast = boost::beast; namespace http = beast::http; diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 06e860841..ad47e6100 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -36,9 +36,6 @@ public: void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; protected: - WorkQueue m_WorkQueue{10000000, 1}; - std::vector m_DataBuffer; - void OnConfigLoaded() override; void Resume() override; void Pause() override; @@ -50,6 +47,8 @@ protected: private: Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; + std::vector m_DataBuffer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -57,7 +56,7 @@ private: const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); void FlushTimeoutWQ(); - void Flush(); + void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value);