GelfWriter: Use async work queue and add feature metric stats

fixes #4532
This commit is contained in:
Michael Friedrich 2017-06-06 19:48:23 +02:00
parent 26d69cfd36
commit f10815efa2
3 changed files with 182 additions and 38 deletions

View File

@ -29,16 +29,55 @@
#include "base/logger.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/json.hpp"
#include "base/context.hpp"
#include "base/exception.hpp"
#include "base/json.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string/replace.hpp>
using namespace icinga;
REGISTER_TYPE(GelfWriter);
REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
GelfWriter::GelfWriter(void)
: m_WorkQueue(10000000, 1)
{ }
void GelfWriter::OnConfigLoaded(void)
{
ObjectImpl<GelfWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GelfWriter, " + GetName());
}
void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
stats->Set("connected", gelfwriter->GetConnected());
stats->Set("source", gelfwriter->GetSource());
nodes->Set(gelfwriter->GetName(), stats);
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("gelfwriter", nodes);
}
void GelfWriter::Start(bool runtimeCreated)
{
ObjectImpl<GelfWriter>::Start(runtimeCreated);
@ -46,18 +85,20 @@ void GelfWriter::Start(bool runtimeCreated)
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' started.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(boost::bind(&GelfWriter::ExceptionHandler, this, _1));
/* Timer for reconnecting */
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GelfWriter::ReconnectTimerHandler, this));
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
// Send check results
Service::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
// Send notifications
Service::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
// Send state change
Service::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
/* Register event handlers. */
Checkable::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
Checkable::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
Checkable::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
}
void GelfWriter::Stop(bool runtimeRemoved)
@ -65,41 +106,98 @@ void GelfWriter::Stop(bool runtimeRemoved)
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
ObjectImpl<GelfWriter>::Stop(runtimeRemoved);
}
void GelfWriter::ReconnectTimerHandler(void)
void GelfWriter::AssertOnWorkQueue(void)
{
if (m_Stream)
ASSERT(m_WorkQueue.IsWorkerThread());
}
void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
Log(LogDebug, "GelfWriter")
<< "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp);
if (GetConnected()) {
m_Stream->Close();
SetConnected(false);
}
}
void GelfWriter::Reconnect(void)
{
AssertOnWorkQueue();
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
SetShouldConnect(true);
if (GetConnected())
return;
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "GelfWriter")
<< "Reconnecting to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
} catch (std::exception&) {
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Can't connect to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
return;
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
m_Stream = new NetworkStream(socket);
SetConnected(true);
Log(LogInformation, "GelfWriter")
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
void GelfWriter::ReconnectTimerHandler(void)
{
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::Reconnect, this), PriorityNormal);
}
void GelfWriter::Disconnect(void)
{
AssertOnWorkQueue();
if (!GetConnected())
return;
m_Stream->Close();
SetConnected(false);
}
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
}
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
<< "GELF Processing check result for '" << checkable->GetName() << "'";
<< "Processing check result for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
double ts = Utility::GetTime();
Dictionary::Ptr fields = new Dictionary();
@ -122,6 +220,8 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
fields->Set("_reachable", checkable->IsReachable());
double ts = Utility::GetTime();
if (cr) {
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
@ -175,28 +275,38 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
}
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
const String& author, const String& comment_text, const String& command_name)
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
notification, checkable, user, notificationType, cr, author, commentText, commandName));
}
void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
<< "GELF Processing notification for '" << checkable->GetName() << "'";
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
double ts = Utility::GetTime();
String notification_type_str = Notification::NotificationTypeToString(notification_type);
String notificationTypeString = Notification::NotificationTypeToString(notificationType);
String author_comment = "";
String authorComment = "";
if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) {
author_comment = author + ";" + comment_text;
if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
authorComment = author + ";" + commentText;
}
String output;
double ts = Utility::GetTime();
if (cr) {
output = CompatUtility::GetCheckResultOutput(cr);
@ -211,30 +321,37 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification
fields->Set("short_message", output);
} else {
fields->Set("_type", "HOST NOTIFICATION");
//TODO: why?
fields->Set("short_message", "(" + CompatUtility::GetHostStateString(host) + ")");
}
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
fields->Set("_hostname", host->GetName());
fields->Set("_command", command_name);
fields->Set("_notification_type", notification_type_str);
fields->Set("_comment", author_comment);
fields->Set("_command", commandName);
fields->Set("_notification_type", notificationTypeString);
fields->Set("_comment", authorComment);
SendLogMessage(ComposeGelfMessage(fields, GetSource(), ts));
}
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
<< "GELF Processing state change for '" << checkable->GetName() << "'";
<< "Processing state change for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
double ts = Utility::GetTime();
Dictionary::Ptr fields = new Dictionary();
@ -254,6 +371,8 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
fields->Set("_last_hard_state", host->GetLastHardState());
}
double ts = Utility::GetTime();
if (cr) {
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
@ -273,28 +392,28 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
return JsonEncode(fields);
}
void GelfWriter::SendLogMessage(const String& gelf)
void GelfWriter::SendLogMessage(const String& gelfMessage)
{
std::ostringstream msgbuf;
msgbuf << gelf;
msgbuf << gelfMessage;
msgbuf << '\0';
String log = msgbuf.str();
ObjectLock olock(this);
if (!m_Stream)
if (!GetConnected())
return;
try {
//TODO remove
Log(LogDebug, "GelfWriter")
<< "Sending '" << log << "'.";
m_Stream->Write(log.CStr(), log.GetLength());
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
m_Stream.reset();
throw ex;
}
}

View File

@ -25,13 +25,14 @@
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <fstream>
namespace icinga
{
/**
* An Icinga gelf writer.
* An Icinga Gelf writer for Graylog.
*
* @ingroup perfdata
*/
@ -41,24 +42,43 @@ public:
DECLARE_OBJECT(GelfWriter);
DECLARE_OBJECTNAME(GelfWriter);
GelfWriter(void);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
protected:
virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
Stream::Ptr m_Stream;
WorkQueue m_WorkQueue;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
const String& author, const String& commentText, const String& commandName);
void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
const String& author, const String& comment_text, const String& command_name);
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void SendLogMessage(const String& gelf);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const String& gelfMessage);
void ReconnectTimerHandler(void);
void Disconnect(void);
void Reconnect(void);
void AssertOnWorkQueue(void);
void ExceptionHandler(boost::exception_ptr exp);
};
}

View File

@ -38,6 +38,11 @@ class GelfWriter : ConfigObject
[config] bool enable_send_perfdata {
default {{{ return false; }}}
};
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
};
}