Implement HA functionality for Elasticsearch feature

This commit is contained in:
Michael Friedrich 2018-10-24 13:20:03 +02:00
parent 001ffda61c
commit 59389f564c
3 changed files with 29 additions and 8 deletions

View File

@ -49,6 +49,15 @@ void ElasticsearchWriter::OnConfigLoaded()
ObjectImpl<ElasticsearchWriter>::OnConfigLoaded(); ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
m_WorkQueue.SetName("ElasticsearchWriter, " + GetName()); m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "ElasticsearchWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
} }
void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -71,14 +80,14 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
} }
void ElasticsearchWriter::Start(bool runtimeCreated) void ElasticsearchWriter::Resume()
{ {
ObjectImpl<ElasticsearchWriter>::Start(runtimeCreated); ObjectImpl<ElasticsearchWriter>::Resume();
m_EventPrefix = "icinga2.event."; m_EventPrefix = "icinga2.event.";
Log(LogInformation, "ElasticsearchWriter") Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' started."; << "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1)); m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
@ -95,14 +104,14 @@ void ElasticsearchWriter::Start(bool runtimeCreated)
Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7)); Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
} }
void ElasticsearchWriter::Stop(bool runtimeRemoved) void ElasticsearchWriter::Pause()
{ {
Log(LogInformation, "ElasticsearchWriter") Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' stopped."; << "'" << GetName() << "' paused.";
m_WorkQueue.Join(); m_WorkQueue.Join();
ObjectImpl<ElasticsearchWriter>::Stop(runtimeRemoved); ObjectImpl<ElasticsearchWriter>::Pause();
} }
void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -176,6 +185,9 @@ void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Ch
void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr)); m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
} }
@ -230,6 +242,9 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type)); m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
} }
@ -279,6 +294,9 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text) const CheckResult::Ptr& cr, const String& author, const String& text)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this, m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
notification, checkable, users, type, cr, author, text)); notification, checkable, users, type, cr, author, text));
} }

View File

@ -41,8 +41,8 @@ public:
protected: protected:
void OnConfigLoaded() override; void OnConfigLoaded() override;
void Start(bool runtimeCreated) override; void Resume() override;
void Stop(bool runtimeRemoved) override; void Pause() override;
private: private:
String m_EventPrefix; String m_EventPrefix;

View File

@ -37,6 +37,9 @@ class ElasticsearchWriter : ConfigObject
[config] int flush_threshold { [config] int flush_threshold {
default {{{ return 1024; }}} default {{{ return 1024; }}}
}; };
[config] bool enable_ha {
default {{{ return true; }}}
};
}; };
} }