From 33781496daa3a1863583874ae8c8555d5dd6f51a Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 16 Dec 2021 14:49:50 +0100 Subject: [PATCH] InfluxdbCommonWriter: use atomic_size_t to data buffer size from stats function m_DataBuffer may be modified concurrently while StatsFunc() is called, thus it's unsafe to call size() on it. As write access to m_DataBuffer is already synchronized by only modifying it from the single work queue thread, instead of adding a mutex, this commit adds a new std::atomic_size_t which is additionally updated when modifying m_DataBuffer and can safely be accessed in StatsFunc(). --- lib/perfdata/influxdbcommonwriter.cpp | 2 ++ lib/perfdata/influxdbcommonwriter.hpp | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 071abae93..98eb453b7 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -402,6 +402,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic // Buffer the data point m_DataBuffer.emplace_back(msgbuf.str()); + m_DataBufferSize = m_DataBuffer.size(); // Flush if we've buffered too much to prevent excessive memory use if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { @@ -447,6 +448,7 @@ void InfluxdbCommonWriter::FlushWQ() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); + m_DataBufferSize = 0; OptionalTlsStream stream; diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index ad47e6100..791a326b4 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -14,6 +14,7 @@ #include "remote/url.hpp" #include #include +#include #include namespace icinga @@ -49,6 +50,7 @@ private: Timer::Ptr m_FlushTimer; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; + std::atomic_size_t m_DataBufferSize{0}; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -77,7 +79,7 @@ void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array: for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = influxwriter->m_WorkQueue.GetLength(); double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - size_t dataBufferItems = influxwriter->m_DataBuffer.size(); + size_t dataBufferItems = influxwriter->m_DataBufferSize; nodes.emplace_back(influxwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems },