mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-23 01:38:11 +02:00
Merge pull request #9247 from Icinga/bugfix/influxdb-writer-synchronization-212
Fix unsafe concurrent access to m_DataBuffer in InfluxdbWriter
This commit is contained in:
commit
0ccd7b799c
@ -89,7 +89,7 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
||||
for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
|
||||
size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
|
||||
size_t dataBufferItems = influxdbwriter->m_DataBufferSize;
|
||||
|
||||
nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
@ -131,22 +131,13 @@ void InfluxdbWriter::Pause()
|
||||
{
|
||||
/* Force a flush. */
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Flushing pending data buffers.";
|
||||
<< "Processing pending tasks and flushing data buffers.";
|
||||
|
||||
Flush();
|
||||
|
||||
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Joining existing WQ tasks.";
|
||||
m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
|
||||
|
||||
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
|
||||
m_WorkQueue.Join();
|
||||
|
||||
/* Flush again after the WQ tasks have filled the data buffer. */
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Flushing data buffers from WQ tasks.";
|
||||
|
||||
Flush();
|
||||
|
||||
Log(LogInformation, "InfluxdbWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
||||
@ -435,6 +426,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar
|
||||
|
||||
// Buffer the data point
|
||||
m_DataBuffer.emplace_back(msgbuf.str());
|
||||
m_DataBufferSize = m_DataBuffer.size();
|
||||
|
||||
// Flush if we've buffered too much to prevent excessive memory use
|
||||
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
|
||||
@ -442,7 +434,7 @@ void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionar
|
||||
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
|
||||
|
||||
try {
|
||||
Flush();
|
||||
FlushWQ();
|
||||
} catch (...) {
|
||||
/* Do nothing. */
|
||||
}
|
||||
@ -461,11 +453,13 @@ void InfluxdbWriter::FlushTimeoutWQ()
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
|
||||
Flush();
|
||||
FlushWQ();
|
||||
}
|
||||
|
||||
void InfluxdbWriter::Flush()
|
||||
void InfluxdbWriter::FlushWQ()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
namespace beast = boost::beast;
|
||||
namespace http = beast::http;
|
||||
|
||||
@ -478,6 +472,7 @@ void InfluxdbWriter::Flush()
|
||||
|
||||
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
||||
m_DataBuffer.clear();
|
||||
m_DataBufferSize = 0;
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <atomic>
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
@ -37,9 +38,10 @@ protected:
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
Timer::Ptr m_FlushTimer;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::atomic_size_t m_DataBufferSize{0};
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
@ -47,7 +49,7 @@ private:
|
||||
const String& label, const Dictionary::Ptr& fields, double ts);
|
||||
void FlushTimeout();
|
||||
void FlushTimeoutWQ();
|
||||
void Flush();
|
||||
void FlushWQ();
|
||||
|
||||
static String EscapeKeyOrTagValue(const String& str);
|
||||
static String EscapeValue(const Value& value);
|
||||
|
Loading…
x
Reference in New Issue
Block a user