From e6300aacf9e2872e8145529b9d288b04b72358aa Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 16 Dec 2021 14:39:56 +0100 Subject: [PATCH] InfluxdbCommonWriter: only flush from work queue There is no explicit synchronization of access to m_DataBuffer which is fine if it is only accessed from the single-threaded work queue. However, Stop() also called Flush() in another thread, leading to concurrent write access to m_DataBuffer which can result in a crash due to use after free/double free. Changes in this commit: * Flush() is renamed to FlushWQ() to show that it should only be called from the work queue. Additionally, it now asserts that it is running on the work queue. * Visibility of some data members is changed from protected to private. No other classes have to access these at the moment. By this change, accidental concurrent access from derived classes in the future is prevented. * Stop() now flushes by posting FlushWQ() to the work queue and joining it. --- lib/perfdata/influxdbcommonwriter.cpp | 23 ++++++++--------------- lib/perfdata/influxdbcommonwriter.hpp | 7 +++---- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 90e278ae0..071abae93 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -107,22 +107,13 @@ void InfluxdbCommonWriter::Pause() { /* Force a flush. */ Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing pending data buffers."; + << "Processing pending tasks and flushing data buffers."; - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Joining existing WQ tasks."; + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, GetReflectionType()->GetName()) - << "Flushing data buffers from WQ tasks."; - - Flush(); - Log(LogInformation, GetReflectionType()->GetName()) << "'" << GetName() << "' paused."; @@ -418,7 +409,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; try { - Flush(); + FlushWQ(); } catch (...) { /* Do nothing. */ } @@ -437,11 +428,13 @@ void InfluxdbCommonWriter::FlushTimeoutWQ() Log(LogDebug, GetReflectionType()->GetName()) << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + FlushWQ(); } -void InfluxdbCommonWriter::Flush() +void InfluxdbCommonWriter::FlushWQ() { + AssertOnWorkQueue(); + namespace beast = boost::beast; namespace http = beast::http; diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 06e860841..ad47e6100 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -36,9 +36,6 @@ public: void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; protected: - WorkQueue m_WorkQueue{10000000, 1}; - std::vector m_DataBuffer; - void OnConfigLoaded() override; void Resume() override; void Pause() override; @@ -50,6 +47,8 @@ protected: private: Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; + std::vector m_DataBuffer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -57,7 +56,7 @@ private: const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); void FlushTimeoutWQ(); - void Flush(); + void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value);