Merge pull request #9137 from Icinga/bugfix/influxdb-writer-synchronization

Fix unsafe concurrent access to m_DataBuffer in InfluxdbCommonWriter
This commit is contained in:
Alexander Aleksandrovič Klimov 2022-01-04 17:37:28 +01:00 committed by GitHub
commit 1b50d912a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 20 deletions

View File

@ -107,22 +107,13 @@ void InfluxdbCommonWriter::Pause()
{ {
/* Force a flush. */ /* Force a flush. */
Log(LogDebug, GetReflectionType()->GetName()) Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing pending data buffers."; << "Processing pending tasks and flushing data buffers.";
Flush(); m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Joining existing WQ tasks.";
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
m_WorkQueue.Join(); m_WorkQueue.Join();
/* Flush again after the WQ tasks have filled the data buffer. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing data buffers from WQ tasks.";
Flush();
Log(LogInformation, GetReflectionType()->GetName()) Log(LogInformation, GetReflectionType()->GetName())
<< "'" << GetName() << "' paused."; << "'" << GetName() << "' paused.";
@ -411,6 +402,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
// Buffer the data point // Buffer the data point
m_DataBuffer.emplace_back(msgbuf.str()); m_DataBuffer.emplace_back(msgbuf.str());
m_DataBufferSize = m_DataBuffer.size();
// Flush if we've buffered too much to prevent excessive memory use // Flush if we've buffered too much to prevent excessive memory use
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) { if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
@ -418,7 +410,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
try { try {
Flush(); FlushWQ();
} catch (...) { } catch (...) {
/* Do nothing. */ /* Do nothing. */
} }
@ -437,11 +429,13 @@ void InfluxdbCommonWriter::FlushTimeoutWQ()
Log(LogDebug, GetReflectionType()->GetName()) Log(LogDebug, GetReflectionType()->GetName())
<< "Timer expired writing " << m_DataBuffer.size() << " data points"; << "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush(); FlushWQ();
} }
void InfluxdbCommonWriter::Flush() void InfluxdbCommonWriter::FlushWQ()
{ {
AssertOnWorkQueue();
namespace beast = boost::beast; namespace beast = boost::beast;
namespace http = beast::http; namespace http = beast::http;
@ -454,6 +448,7 @@ void InfluxdbCommonWriter::Flush()
String body = boost::algorithm::join(m_DataBuffer, "\n"); String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear(); m_DataBuffer.clear();
m_DataBufferSize = 0;
OptionalTlsStream stream; OptionalTlsStream stream;

View File

@ -14,6 +14,7 @@
#include "remote/url.hpp" #include "remote/url.hpp"
#include <boost/beast/http/message.hpp> #include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp> #include <boost/beast/http/string_body.hpp>
#include <atomic>
#include <fstream> #include <fstream>
namespace icinga namespace icinga
@ -36,9 +37,6 @@ public:
void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override; void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
protected: protected:
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
void OnConfigLoaded() override; void OnConfigLoaded() override;
void Resume() override; void Resume() override;
void Pause() override; void Pause() override;
@ -50,6 +48,9 @@ protected:
private: private:
Timer::Ptr m_FlushTimer; Timer::Ptr m_FlushTimer;
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
std::atomic_size_t m_DataBufferSize{0};
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
@ -57,7 +58,7 @@ private:
const String& label, const Dictionary::Ptr& fields, double ts); const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout(); void FlushTimeout();
void FlushTimeoutWQ(); void FlushTimeoutWQ();
void Flush(); void FlushWQ();
static String EscapeKeyOrTagValue(const String& str); static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value); static String EscapeValue(const Value& value);
@ -78,7 +79,7 @@ void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array:
for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) { for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) {
size_t workQueueItems = influxwriter->m_WorkQueue.GetLength(); size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0; double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
size_t dataBufferItems = influxwriter->m_DataBuffer.size(); size_t dataBufferItems = influxwriter->m_DataBufferSize;
nodes.emplace_back(influxwriter->GetName(), new Dictionary({ nodes.emplace_back(influxwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems }, { "work_queue_items", workQueueItems },