InfluxdbCommonWriter: only flush from work queue

There is no explicit synchronization of access to m_DataBuffer which is fine if
it is only accessed from the single-threaded work queue. However, Stop() also
called Flush() in another thread, leading to concurrent write access to
m_DataBuffer which can result in a crash due to use after free/double free.

Changes in this commit:
* Flush() is renamed to FlushWQ() to show that it should only be called from
  the work queue. Additionally, it now asserts that it is running on the work
  queue.
* Visibility of some data members is changed from protected to private. No
  other classes have to access these at the moment. By this change, accidental
  concurrent access from derived classes in the future is prevented.
* Stop() now flushes by posting FlushWQ() to the work queue and joining it.
This commit is contained in:
Julian Brost 2021-12-16 14:39:56 +01:00
parent cbbaf4eac8
commit e6300aacf9
2 changed files with 11 additions and 19 deletions

View File

@ -107,22 +107,13 @@ void InfluxdbCommonWriter::Pause()
{
/* Force a flush. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing pending data buffers.";
<< "Processing pending tasks and flushing data buffers.";
Flush();
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Joining existing WQ tasks.";
m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
m_WorkQueue.Join();
/* Flush again after the WQ tasks have filled the data buffer. */
Log(LogDebug, GetReflectionType()->GetName())
<< "Flushing data buffers from WQ tasks.";
Flush();
Log(LogInformation, GetReflectionType()->GetName())
<< "'" << GetName() << "' paused.";
@ -418,7 +409,7 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
try {
Flush();
FlushWQ();
} catch (...) {
/* Do nothing. */
}
@ -437,11 +428,13 @@ void InfluxdbCommonWriter::FlushTimeoutWQ()
Log(LogDebug, GetReflectionType()->GetName())
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
FlushWQ();
}
void InfluxdbCommonWriter::Flush()
void InfluxdbCommonWriter::FlushWQ()
{
AssertOnWorkQueue();
namespace beast = boost::beast;
namespace http = beast::http;

View File

@ -36,9 +36,6 @@ public:
void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
protected:
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
void OnConfigLoaded() override;
void Resume() override;
void Pause() override;
@ -50,6 +47,8 @@ protected:
private:
Timer::Ptr m_FlushTimer;
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
@ -57,7 +56,7 @@ private:
const String& label, const Dictionary::Ptr& fields, double ts);
void FlushTimeout();
void FlushTimeoutWQ();
void Flush();
void FlushWQ();
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);