Implement HA functionality for Gelf feature

This commit is contained in:
Michael Friedrich 2018-10-24 13:43:38 +02:00
parent 5e241dc815
commit b905999f4b
3 changed files with 34 additions and 8 deletions

View File

@ -51,6 +51,15 @@ void GelfWriter::OnConfigLoaded()
ObjectImpl<GelfWriter>::OnConfigLoaded(); ObjectImpl<GelfWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GelfWriter, " + GetName()); m_WorkQueue.SetName("GelfWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "GelfWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
} }
void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -75,12 +84,12 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
status->Set("gelfwriter", new Dictionary(std::move(nodes))); status->Set("gelfwriter", new Dictionary(std::move(nodes)));
} }
void GelfWriter::Start(bool runtimeCreated) void GelfWriter::Resume()
{ {
ObjectImpl<GelfWriter>::Start(runtimeCreated); ObjectImpl<GelfWriter>::Resume();
Log(LogInformation, "GelfWriter") Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' started."; << "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */ /* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1)); m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
@ -98,14 +107,14 @@ void GelfWriter::Start(bool runtimeCreated)
Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3)); Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
} }
void GelfWriter::Stop(bool runtimeRemoved) void GelfWriter::Pause()
{ {
Log(LogInformation, "GelfWriter") Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' stopped."; << "'" << GetName() << "' paused.";
m_WorkQueue.Join(); m_WorkQueue.Join();
ObjectImpl<GelfWriter>::Stop(runtimeRemoved); ObjectImpl<GelfWriter>::Pause();
} }
void GelfWriter::AssertOnWorkQueue() void GelfWriter::AssertOnWorkQueue()
@ -131,6 +140,11 @@ void GelfWriter::Reconnect()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
double startTime = Utility::GetTime(); double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'"); CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
@ -180,6 +194,9 @@ void GelfWriter::Disconnect()
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr)); m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
} }
@ -284,6 +301,9 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName) const String& author, const String& commentText, const String& commandName)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this, m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
notification, checkable, user, notificationType, cr, author, commentText, commandName)); notification, checkable, user, notificationType, cr, author, commentText, commandName));
} }
@ -348,6 +368,9 @@ void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& noti
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{ {
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type)); m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
} }

View File

@ -46,8 +46,8 @@ public:
protected: protected:
void OnConfigLoaded() override; void OnConfigLoaded() override;
void Start(bool runtimeCreated) override; void Resume() override;
void Stop(bool runtimeRemoved) override; void Pause() override;
private: private:
Stream::Ptr m_Stream; Stream::Ptr m_Stream;

View File

@ -45,6 +45,9 @@ class GelfWriter : ConfigObject
[no_user_modify] bool should_connect { [no_user_modify] bool should_connect {
default {{{ return true; }}} default {{{ return true; }}}
}; };
[config] bool enable_ha {
default {{{ return true; }}}
};
}; };
} }