mirror of https://github.com/Icinga/icinga2.git
InfluxdbCommonWriter: use atomic_size_t to data buffer size from stats function
m_DataBuffer may be modified concurrently while StatsFunc() is called, thus it's unsafe to call size() on it. As write access to m_DataBuffer is already synchronized by only modifying it from the single work queue thread, instead of adding a mutex, this commit adds a new std::atomic_size_t which is additionally updated when modifying m_DataBuffer and can safely be accessed in StatsFunc().
This commit is contained in:
parent
e6300aacf9
commit
33781496da
|
@ -402,6 +402,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
|
|||
|
||||
// Buffer the data point
|
||||
m_DataBuffer.emplace_back(msgbuf.str());
|
||||
m_DataBufferSize = m_DataBuffer.size();
|
||||
|
||||
// Flush if we've buffered too much to prevent excessive memory use
|
||||
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
|
||||
|
@ -447,6 +448,7 @@ void InfluxdbCommonWriter::FlushWQ()
|
|||
|
||||
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
||||
m_DataBuffer.clear();
|
||||
m_DataBufferSize = 0;
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "remote/url.hpp"
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <atomic>
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
|
@ -49,6 +50,7 @@ private:
|
|||
Timer::Ptr m_FlushTimer;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::atomic_size_t m_DataBufferSize{0};
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
|
@ -77,7 +79,7 @@ void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array:
|
|||
for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) {
|
||||
size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
size_t dataBufferItems = influxwriter->m_DataBuffer.size();
|
||||
size_t dataBufferItems = influxwriter->m_DataBufferSize;
|
||||
|
||||
nodes.emplace_back(influxwriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
|
|
Loading…
Reference in New Issue