Implement HA functionality for InfluxDB feature

This commit is contained in:
Michael Friedrich 2018-10-24 13:38:06 +02:00
parent 37ccffc7e5
commit 5e241dc815
3 changed files with 23 additions and 8 deletions

View File

@ -75,6 +75,15 @@ void InfluxdbWriter::OnConfigLoaded()
ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "InfluxdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -100,12 +109,12 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
status->Set("influxdbwriter", new Dictionary(std::move(nodes)));
}
void InfluxdbWriter::Start(bool runtimeCreated)
void InfluxdbWriter::Resume()
{
ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
ObjectImpl<InfluxdbWriter>::Resume();
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(std::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
@ -121,14 +130,14 @@ void InfluxdbWriter::Start(bool runtimeCreated)
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}
void InfluxdbWriter::Stop(bool runtimeRemoved)
void InfluxdbWriter::Pause()
{
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
m_WorkQueue.Join();
ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
ObjectImpl<InfluxdbWriter>::Pause();
}
void InfluxdbWriter::AssertOnWorkQueue()
@ -188,6 +197,9 @@ Stream::Ptr InfluxdbWriter::Connect()
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow);
}

View File

@ -49,8 +49,8 @@ public:
protected:
void OnConfigLoaded() override;
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void Resume() override;
void Pause() override;
private:
WorkQueue m_WorkQueue{10000000, 1};

View File

@ -88,6 +88,9 @@ class InfluxdbWriter : ConfigObject
[config] int flush_threshold {
default {{{ return 1024; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
validator InfluxdbWriter {