diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index afc563c12..b9d43bdd9 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -29,16 +29,55 @@ #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" +#include "base/application.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" -#include "base/json.hpp" #include "base/context.hpp" +#include "base/exception.hpp" +#include "base/json.hpp" +#include "base/statsfunction.hpp" #include using namespace icinga; REGISTER_TYPE(GelfWriter); +REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc); + +GelfWriter::GelfWriter(void) + : m_WorkQueue(10000000, 1) +{ } + +void GelfWriter::OnConfigLoaded(void) +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName("GelfWriter, " + GetName()); +} + +void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + Dictionary::Ptr nodes = new Dictionary(); + + for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType()) { + size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + Dictionary::Ptr stats = new Dictionary(); + stats->Set("work_queue_items", workQueueItems); + stats->Set("work_queue_item_rate", workQueueItemRate); + stats->Set("connected", gelfwriter->GetConnected()); + stats->Set("source", gelfwriter->GetSource()); + + nodes->Set(gelfwriter->GetName(), stats); + + perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + } + + status->Set("gelfwriter", nodes); +} + void GelfWriter::Start(bool runtimeCreated) { ObjectImpl::Start(runtimeCreated); @@ -46,18 +85,20 @@ void GelfWriter::Start(bool runtimeCreated) Log(LogInformation, "GelfWriter") << "'" << GetName() << "' started."; + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback(boost::bind(&GelfWriter::ExceptionHandler, this, _1)); + + /* Timer for reconnecting */ m_ReconnectTimer = new Timer(); m_ReconnectTimer->SetInterval(10); m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GelfWriter::ReconnectTimerHandler, this)); m_ReconnectTimer->Start(); m_ReconnectTimer->Reschedule(0); - // Send check results - Service::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2)); - // Send notifications - Service::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); - // Send state change - Service::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3)); + /* Register event handlers. */ + Checkable::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2)); + Checkable::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); + Checkable::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3)); } void GelfWriter::Stop(bool runtimeRemoved) @@ -65,41 +106,98 @@ void GelfWriter::Stop(bool runtimeRemoved) Log(LogInformation, "GelfWriter") << "'" << GetName() << "' stopped."; + m_WorkQueue.Join(); + ObjectImpl::Stop(runtimeRemoved); } -void GelfWriter::ReconnectTimerHandler(void) +void GelfWriter::AssertOnWorkQueue(void) { - if (m_Stream) + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void GelfWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!"); + + Log(LogDebug, "GelfWriter") + << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp); + + if (GetConnected()) { + m_Stream->Close(); + + SetConnected(false); + } +} + +void GelfWriter::Reconnect(void) +{ + AssertOnWorkQueue(); + + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'"); + + SetShouldConnect(true); + + if (GetConnected()) return; TcpSocket::Ptr socket = new TcpSocket(); Log(LogNotice, "GelfWriter") - << "Reconnecting to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; try { socket->Connect(GetHost(), GetPort()); - } catch (std::exception&) { + } catch (const std::exception& ex) { Log(LogCritical, "GelfWriter") - << "Can't connect to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - return; + << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw ex; } m_Stream = new NetworkStream(socket); + + SetConnected(true); + + Log(LogInformation, "GelfWriter") + << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +void GelfWriter::ReconnectTimerHandler(void) +{ + m_WorkQueue.Enqueue(boost::bind(&GelfWriter::Reconnect, this), PriorityNormal); +} + +void GelfWriter::Disconnect(void) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + m_Stream->Close(); + + SetConnected(false); } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { + m_WorkQueue.Enqueue(boost::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr)); +} + +void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'"); Log(LogDebug, "GelfWriter") - << "GELF Processing check result for '" << checkable->GetName() << "'"; + << "Processing check result for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - double ts = Utility::GetTime(); Dictionary::Ptr fields = new Dictionary(); @@ -122,6 +220,8 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check fields->Set("_reachable", checkable->IsReachable()); + double ts = Utility::GetTime(); + if (cr) { fields->Set("_latency", cr->CalculateLatency()); fields->Set("_execution_time", cr->CalculateExecutionTime()); @@ -175,28 +275,38 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check } void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, - const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, - const String& author, const String& comment_text, const String& command_name) + const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, + const String& author, const String& commentText, const String& commandName) { + m_WorkQueue.Enqueue(boost::bind(&GelfWriter::NotificationToUserHandlerInternal, this, + notification, checkable, user, notificationType, cr, author, commentText, commandName)); +} + +void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, + const String& author, const String& commentText, const String& commandName) +{ + AssertOnWorkQueue(); + CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'"); Log(LogDebug, "GelfWriter") - << "GELF Processing notification for '" << checkable->GetName() << "'"; + << "Processing notification for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - double ts = Utility::GetTime(); - String notification_type_str = Notification::NotificationTypeToString(notification_type); + String notificationTypeString = Notification::NotificationTypeToString(notificationType); - String author_comment = ""; + String authorComment = ""; - if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { - author_comment = author + ";" + comment_text; + if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) { + authorComment = author + ";" + commentText; } String output; + double ts = Utility::GetTime(); if (cr) { output = CompatUtility::GetCheckResultOutput(cr); @@ -211,30 +321,37 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification fields->Set("short_message", output); } else { fields->Set("_type", "HOST NOTIFICATION"); + //TODO: why? fields->Set("short_message", "(" + CompatUtility::GetHostStateString(host) + ")"); } fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); fields->Set("_hostname", host->GetName()); - fields->Set("_command", command_name); - fields->Set("_notification_type", notification_type_str); - fields->Set("_comment", author_comment); + fields->Set("_command", commandName); + fields->Set("_notification_type", notificationTypeString); + fields->Set("_comment", authorComment); SendLogMessage(ComposeGelfMessage(fields, GetSource(), ts)); } void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { + m_WorkQueue.Enqueue(boost::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type)); +} + +void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + AssertOnWorkQueue(); + CONTEXT("GELF Processing state change '" + checkable->GetName() + "'"); Log(LogDebug, "GelfWriter") - << "GELF Processing state change for '" << checkable->GetName() << "'"; + << "Processing state change for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - double ts = Utility::GetTime(); Dictionary::Ptr fields = new Dictionary(); @@ -254,6 +371,8 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check fields->Set("_last_hard_state", host->GetLastHardState()); } + double ts = Utility::GetTime(); + if (cr) { fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); fields->Set("full_message", cr->GetOutput()); @@ -273,28 +392,28 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin return JsonEncode(fields); } -void GelfWriter::SendLogMessage(const String& gelf) +void GelfWriter::SendLogMessage(const String& gelfMessage) { std::ostringstream msgbuf; - msgbuf << gelf; + msgbuf << gelfMessage; msgbuf << '\0'; String log = msgbuf.str(); ObjectLock olock(this); - if (!m_Stream) + if (!GetConnected()) return; try { - //TODO remove Log(LogDebug, "GelfWriter") << "Sending '" << log << "'."; + m_Stream->Write(log.CStr(), log.GetLength()); } catch (const std::exception& ex) { Log(LogCritical, "GelfWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - m_Stream.reset(); + throw ex; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index 26bb1c426..9cd7d2811 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -25,13 +25,14 @@ #include "base/configobject.hpp" #include "base/tcpsocket.hpp" #include "base/timer.hpp" +#include "base/workqueue.hpp" #include namespace icinga { /** - * An Icinga gelf writer. + * An Icinga Gelf writer for Graylog. * * @ingroup perfdata */ @@ -41,24 +42,43 @@ public: DECLARE_OBJECT(GelfWriter); DECLARE_OBJECTNAME(GelfWriter); + GelfWriter(void); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + protected: + virtual void OnConfigLoaded(void) override; virtual void Start(bool runtimeCreated) override; virtual void Stop(bool runtimeRemoved) override; private: Stream::Ptr m_Stream; + WorkQueue m_WorkQueue; Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, - const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, - const String& author, const String& comment_text, const String& command_name); - String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); + const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, + const String& author, const String& commentText, const String& commandName); + void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr, + const String& author, const String& comment_text, const String& command_name); void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); - void SendLogMessage(const String& gelf); + void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + + String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); + void SendLogMessage(const String& gelfMessage); void ReconnectTimerHandler(void); + + void Disconnect(void); + void Reconnect(void); + + void AssertOnWorkQueue(void); + + void ExceptionHandler(boost::exception_ptr exp); }; } diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index 63540daec..f6f06d2ec 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -38,6 +38,11 @@ class GelfWriter : ConfigObject [config] bool enable_send_perfdata { default {{{ return false; }}} }; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; }; }