InfluxdbWriter: use atomic_size_t to data buffer size from stats function

m_DataBuffer may be modified concurrently while StatsFunc() is called, thus
it's unsafe to call size() on it. As write access to m_DataBuffer is already
synchronized by only modifying it from the single work queue thread, instead of
adding a mutex, this commit adds a new std::atomic_size_t which is additionally
updated when modifying m_DataBuffer and can safely be accessed in StatsFunc().
This commit is contained in:
Julian Brost 2021-12-16 14:49:50 +01:00 committed by Yonas Habteab
parent a2a5c3f28d
commit 4d28a01b84
2 changed files with 5 additions and 1 deletions

View File

@ -89,7 +89,7 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
size_t dataBufferItems = influxdbwriter->m_DataBufferSize;
nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
@ -426,6 +426,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar
// Buffer the data point
m_DataBuffer.emplace_back(msgbuf.str());
m_DataBufferSize = m_DataBuffer.size();
// Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
@ -471,6 +472,7 @@ void InfluxdbWriter::FlushWQ()
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
m_DataBufferSize = 0;
OptionalTlsStream stream;

View File

@ -10,6 +10,7 @@
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include <atomic>
#include <fstream>
namespace icinga
@ -40,6 +41,7 @@ private:
Timer::Ptr m_FlushTimer;
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
std::atomic_size_t m_DataBufferSize{0};
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);