mirror of https://github.com/Icinga/icinga2.git
Merge pull request #5329 from Icinga/feature/gelfwriter-workqueue
GelfWriter: Use async work queue and add feature metric stats fixes #4532
This commit is contained in:
commit
13b516413b
|
@ -29,16 +29,55 @@
|
|||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/context.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
REGISTER_TYPE(GelfWriter);
|
||||
|
||||
REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
|
||||
|
||||
GelfWriter::GelfWriter(void)
|
||||
: m_WorkQueue(10000000, 1)
|
||||
{ }
|
||||
|
||||
void GelfWriter::OnConfigLoaded(void)
|
||||
{
|
||||
ObjectImpl<GelfWriter>::OnConfigLoaded();
|
||||
|
||||
m_WorkQueue.SetName("GelfWriter, " + GetName());
|
||||
}
|
||||
|
||||
void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
||||
{
|
||||
Dictionary::Ptr nodes = new Dictionary();
|
||||
|
||||
for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
|
||||
size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
|
||||
Dictionary::Ptr stats = new Dictionary();
|
||||
stats->Set("work_queue_items", workQueueItems);
|
||||
stats->Set("work_queue_item_rate", workQueueItemRate);
|
||||
stats->Set("connected", gelfwriter->GetConnected());
|
||||
stats->Set("source", gelfwriter->GetSource());
|
||||
|
||||
nodes->Set(gelfwriter->GetName(), stats);
|
||||
|
||||
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
|
||||
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
|
||||
}
|
||||
|
||||
status->Set("gelfwriter", nodes);
|
||||
}
|
||||
|
||||
void GelfWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl<GelfWriter>::Start(runtimeCreated);
|
||||
|
@ -46,18 +85,20 @@ void GelfWriter::Start(bool runtimeCreated)
|
|||
Log(LogInformation, "GelfWriter")
|
||||
<< "'" << GetName() << "' started.";
|
||||
|
||||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback(boost::bind(&GelfWriter::ExceptionHandler, this, _1));
|
||||
|
||||
/* Timer for reconnecting */
|
||||
m_ReconnectTimer = new Timer();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GelfWriter::ReconnectTimerHandler, this));
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
// Send check results
|
||||
Service::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
|
||||
// Send notifications
|
||||
Service::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
|
||||
// Send state change
|
||||
Service::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
/* Register event handlers. */
|
||||
Checkable::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
|
||||
Checkable::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
|
||||
Checkable::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
}
|
||||
|
||||
void GelfWriter::Stop(bool runtimeRemoved)
|
||||
|
@ -65,41 +106,98 @@ void GelfWriter::Stop(bool runtimeRemoved)
|
|||
Log(LogInformation, "GelfWriter")
|
||||
<< "'" << GetName() << "' stopped.";
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
ObjectImpl<GelfWriter>::Stop(runtimeRemoved);
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectTimerHandler(void)
|
||||
void GelfWriter::AssertOnWorkQueue(void)
|
||||
{
|
||||
if (m_Stream)
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
}
|
||||
|
||||
void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
|
||||
{
|
||||
Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp);
|
||||
|
||||
if (GetConnected()) {
|
||||
m_Stream->Close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
}
|
||||
|
||||
void GelfWriter::Reconnect(void)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
TcpSocket::Ptr socket = new TcpSocket();
|
||||
|
||||
Log(LogNotice, "GelfWriter")
|
||||
<< "Reconnecting to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
try {
|
||||
socket->Connect(GetHost(), GetPort());
|
||||
} catch (std::exception&) {
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "Can't connect to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
return;
|
||||
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw ex;
|
||||
}
|
||||
|
||||
m_Stream = new NetworkStream(socket);
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "GelfWriter")
|
||||
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectTimerHandler(void)
|
||||
{
|
||||
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::Reconnect, this), PriorityNormal);
|
||||
}
|
||||
|
||||
void GelfWriter::Disconnect(void)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
m_Stream->Close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
|
||||
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
{
|
||||
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
|
||||
}
|
||||
|
||||
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "GELF Processing check result for '" << checkable->GetName() << "'";
|
||||
<< "Processing check result for '" << checkable->GetName() << "'";
|
||||
|
||||
Host::Ptr host;
|
||||
Service::Ptr service;
|
||||
tie(host, service) = GetHostService(checkable);
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
Dictionary::Ptr fields = new Dictionary();
|
||||
|
||||
|
@ -122,6 +220,8 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
|
|||
|
||||
fields->Set("_reachable", checkable->IsReachable());
|
||||
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
if (cr) {
|
||||
fields->Set("_latency", cr->CalculateLatency());
|
||||
fields->Set("_execution_time", cr->CalculateExecutionTime());
|
||||
|
@ -175,28 +275,38 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
|
|||
}
|
||||
|
||||
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
|
||||
const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
|
||||
const String& author, const String& comment_text, const String& command_name)
|
||||
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
|
||||
const String& author, const String& commentText, const String& commandName)
|
||||
{
|
||||
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
|
||||
notification, checkable, user, notificationType, cr, author, commentText, commandName));
|
||||
}
|
||||
|
||||
void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
|
||||
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
|
||||
const String& author, const String& commentText, const String& commandName)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "GELF Processing notification for '" << checkable->GetName() << "'";
|
||||
<< "Processing notification for '" << checkable->GetName() << "'";
|
||||
|
||||
Host::Ptr host;
|
||||
Service::Ptr service;
|
||||
tie(host, service) = GetHostService(checkable);
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
String notification_type_str = Notification::NotificationTypeToString(notification_type);
|
||||
String notificationTypeString = Notification::NotificationTypeToString(notificationType);
|
||||
|
||||
String author_comment = "";
|
||||
String authorComment = "";
|
||||
|
||||
if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) {
|
||||
author_comment = author + ";" + comment_text;
|
||||
if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
|
||||
authorComment = author + ";" + commentText;
|
||||
}
|
||||
|
||||
String output;
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
if (cr) {
|
||||
output = CompatUtility::GetCheckResultOutput(cr);
|
||||
|
@ -211,30 +321,37 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification
|
|||
fields->Set("short_message", output);
|
||||
} else {
|
||||
fields->Set("_type", "HOST NOTIFICATION");
|
||||
//TODO: why?
|
||||
fields->Set("short_message", "(" + CompatUtility::GetHostStateString(host) + ")");
|
||||
}
|
||||
|
||||
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
|
||||
|
||||
fields->Set("_hostname", host->GetName());
|
||||
fields->Set("_command", command_name);
|
||||
fields->Set("_notification_type", notification_type_str);
|
||||
fields->Set("_comment", author_comment);
|
||||
fields->Set("_command", commandName);
|
||||
fields->Set("_notification_type", notificationTypeString);
|
||||
fields->Set("_comment", authorComment);
|
||||
|
||||
SendLogMessage(ComposeGelfMessage(fields, GetSource(), ts));
|
||||
}
|
||||
|
||||
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
||||
{
|
||||
m_WorkQueue.Enqueue(boost::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
|
||||
}
|
||||
|
||||
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "GELF Processing state change for '" << checkable->GetName() << "'";
|
||||
<< "Processing state change for '" << checkable->GetName() << "'";
|
||||
|
||||
Host::Ptr host;
|
||||
Service::Ptr service;
|
||||
tie(host, service) = GetHostService(checkable);
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
Dictionary::Ptr fields = new Dictionary();
|
||||
|
||||
|
@ -254,6 +371,8 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
|
|||
fields->Set("_last_hard_state", host->GetLastHardState());
|
||||
}
|
||||
|
||||
double ts = Utility::GetTime();
|
||||
|
||||
if (cr) {
|
||||
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
|
||||
fields->Set("full_message", cr->GetOutput());
|
||||
|
@ -273,28 +392,28 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
|
|||
return JsonEncode(fields);
|
||||
}
|
||||
|
||||
void GelfWriter::SendLogMessage(const String& gelf)
|
||||
void GelfWriter::SendLogMessage(const String& gelfMessage)
|
||||
{
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << gelf;
|
||||
msgbuf << gelfMessage;
|
||||
msgbuf << '\0';
|
||||
|
||||
String log = msgbuf.str();
|
||||
|
||||
ObjectLock olock(this);
|
||||
|
||||
if (!m_Stream)
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
try {
|
||||
//TODO remove
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Sending '" << log << "'.";
|
||||
|
||||
m_Stream->Write(log.CStr(), log.GetLength());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
m_Stream.reset();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,13 +25,14 @@
|
|||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* An Icinga gelf writer.
|
||||
* An Icinga Gelf writer for Graylog.
|
||||
*
|
||||
* @ingroup perfdata
|
||||
*/
|
||||
|
@ -41,24 +42,43 @@ public:
|
|||
DECLARE_OBJECT(GelfWriter);
|
||||
DECLARE_OBJECTNAME(GelfWriter);
|
||||
|
||||
GelfWriter(void);
|
||||
|
||||
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||
|
||||
protected:
|
||||
virtual void OnConfigLoaded(void) override;
|
||||
virtual void Start(bool runtimeCreated) override;
|
||||
virtual void Stop(bool runtimeRemoved) override;
|
||||
|
||||
private:
|
||||
Stream::Ptr m_Stream;
|
||||
WorkQueue m_WorkQueue;
|
||||
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
|
||||
const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
|
||||
const String& author, const String& comment_text, const String& command_name);
|
||||
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
|
||||
const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
|
||||
const String& author, const String& commentText, const String& commandName);
|
||||
void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
|
||||
const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
|
||||
const String& author, const String& comment_text, const String& command_name);
|
||||
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
|
||||
void SendLogMessage(const String& gelf);
|
||||
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
|
||||
|
||||
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
|
||||
void SendLogMessage(const String& gelfMessage);
|
||||
|
||||
void ReconnectTimerHandler(void);
|
||||
|
||||
void Disconnect(void);
|
||||
void Reconnect(void);
|
||||
|
||||
void AssertOnWorkQueue(void);
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -38,6 +38,11 @@ class GelfWriter : ConfigObject
|
|||
[config] bool enable_send_perfdata {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue