From a2a5c3f28d66c3f92b0a17311f66acab7b786afc Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 16 Dec 2021 14:39:56 +0100 Subject: [PATCH 1/2] InfluxdbWriter: only flush from work queue There is no explicit synchronization of access to m_DataBuffer which is fine if it is only accessed from the single-threaded work queue. However, Stop() also called Flush() in another thread, leading to concurrent write access to m_DataBuffer which can result in a crash due to use after free/double free. Changes in this commit: * Flush() is renamed to FlushWQ() to show that it should only be called from the work queue. Additionally, it now asserts that it is running on the work queue. * Visibility of some data members is changed from protected to private. No other classes have to access these at the moment. By this change, accidental concurrent access from derived classes in the future is prevented. * Stop() now flushes by posting FlushWQ() to the work queue and joining it. --- lib/perfdata/influxdbwriter.cpp | 23 ++++++++--------------- lib/perfdata/influxdbwriter.hpp | 4 ++-- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index bcd5ba0e4..c2e29f1a2 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -131,22 +131,13 @@ void InfluxdbWriter::Pause() { /* Force a flush. */ Log(LogDebug, "InfluxdbWriter") - << "Flushing pending data buffers."; + << "Processing pending tasks and flushing data buffers."; - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, "InfluxdbWriter") - << "Joining existing WQ tasks."; + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, "InfluxdbWriter") - << "Flushing data buffers from WQ tasks."; - - Flush(); - Log(LogInformation, "InfluxdbWriter") << "'" << GetName() << "' paused."; @@ -442,7 +433,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; try { - Flush(); + FlushWQ(); } catch (...) { /* Do nothing. */ } @@ -461,11 +452,13 @@ void InfluxdbWriter::FlushTimeoutWQ() Log(LogDebug, "InfluxdbWriter") << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + FlushWQ(); } -void InfluxdbWriter::Flush() +void InfluxdbWriter::FlushWQ() { + AssertOnWorkQueue(); + namespace beast = boost::beast; namespace http = beast::http; diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index 1f7ab8309..39344bf2e 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -37,8 +37,8 @@ protected: void Pause() override; private: - WorkQueue m_WorkQueue{10000000, 1}; Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -47,7 +47,7 @@ private: const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); void FlushTimeoutWQ(); - void Flush(); + void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); From 4d28a01b84ba0b422a8c605c4e091ed5851cd173 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 16 Dec 2021 14:49:50 +0100 Subject: [PATCH 2/2] InfluxdbWriter: use atomic_size_t to data buffer size from stats function m_DataBuffer may be modified concurrently while StatsFunc() is called, thus it's unsafe to call size() on it. As write access to m_DataBuffer is already synchronized by only modifying it from the single work queue thread, instead of adding a mutex, this commit adds a new std::atomic_size_t which is additionally updated when modifying m_DataBuffer and can safely be accessed in StatsFunc(). --- lib/perfdata/influxdbwriter.cpp | 4 +++- lib/perfdata/influxdbwriter.hpp | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index c2e29f1a2..55eea910c 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -89,7 +89,7 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength(); double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - size_t dataBufferItems = influxdbwriter->m_DataBuffer.size(); + size_t dataBufferItems = influxdbwriter->m_DataBufferSize; nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, @@ -426,6 +426,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar // Buffer the data point m_DataBuffer.emplace_back(msgbuf.str()); + m_DataBufferSize = m_DataBuffer.size(); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { @@ -471,6 +472,7 @@ void InfluxdbWriter::FlushWQ() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); + m_DataBufferSize = 0; OptionalTlsStream stream; diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index 39344bf2e..26d148364 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -10,6 +10,7 @@ #include "base/timer.hpp" #include "base/tlsstream.hpp" #include "base/workqueue.hpp" +#include #include namespace icinga @@ -40,6 +41,7 @@ private: Timer::Ptr m_FlushTimer; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; + std::atomic_size_t m_DataBufferSize{0}; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);