icinga2/lib/perfdata/gelfwriter.cpp
Alexander A. Klimov ba7102cae3 Explicitly stop started timers and wait for them
before permitting their parent objects' destruction.
For the cases where the handlers have raw pointers to these objects.
2023-04-14 14:52:04 +02:00

539 lines
16 KiB
C++

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "perfdata/gelfwriter.hpp"
#include "perfdata/gelfwriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/notification.hpp"
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/compatutility.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/context.hpp"
#include "base/exception.hpp"
#include "base/json.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string/replace.hpp>
#include <utility>
#include "base/io-engine.hpp"
#include <boost/asio/write.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/system/error_code.hpp>
#include <boost/asio/error.hpp>
using namespace icinga;
REGISTER_TYPE(GelfWriter);
REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
void GelfWriter::OnConfigLoaded()
{
ObjectImpl<GelfWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GelfWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "GelfWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "connected", gelfwriter->GetConnected() },
{ "source", gelfwriter->GetSource() }
}));
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("gelfwriter", new Dictionary(std::move(nodes)));
}
void GelfWriter::Resume()
{
ObjectImpl<GelfWriter>::Resume();
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Timer for reconnecting */
m_ReconnectTimer = Timer::Create();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
/* Register event handlers. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
});
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
});
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type);
});
}
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
void GelfWriter::Pause()
{
m_HandleCheckResults.disconnect();
m_HandleNotifications.disconnect();
m_HandleStateChanges.disconnect();
m_ReconnectTimer->Stop(true);
try {
ReconnectInternal();
} catch (const std::exception&) {
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
ObjectImpl<GelfWriter>::Pause();
return;
}
m_WorkQueue.Join();
DisconnectInternal();
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' paused.";
ObjectImpl<GelfWriter>::Pause();
}
void GelfWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
DisconnectInternal();
}
void GelfWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
ReconnectInternal();
}
void GelfWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
SetShouldConnect(true);
if (GetConnected())
return;
Log(LogNotice, "GelfWriter")
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
bool ssl = GetEnableTls();
if (ssl) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "Unable to create SSL context.";
throw;
}
m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
throw;
}
if (ssl) {
auto& tlsStream (m_Stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "TLS handshake with host '" << GetHost() << " failed.'";
throw;
}
if (!GetInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
}
if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}
SetConnected(true);
Log(LogInformation, "GelfWriter")
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
void GelfWriter::ReconnectTimerHandler()
{
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
}
void GelfWriter::Disconnect()
{
AssertOnWorkQueue();
DisconnectInternal();
}
void GelfWriter::DisconnectInternal()
{
if (!GetConnected())
return;
if (m_Stream.first) {
boost::system::error_code ec;
m_Stream.first->next_layer().shutdown(ec);
// https://stackoverflow.com/a/25703699
// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
if (ec.category() == boost::asio::error::get_ssl_category()) {
Log(LogCritical, "GelfWriter")
<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
}
} else if (m_Stream.second) {
m_Stream.second->close();
}
SetConnected(false);
}
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
}
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing check result for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
if (service) {
fields->Set("_service_name", service->GetShortName());
fields->Set("_service_state", Service::StateToString(service->GetState()));
fields->Set("_last_state", service->GetLastState());
fields->Set("_last_hard_state", service->GetLastHardState());
} else {
fields->Set("_last_state", host->GetLastState());
fields->Set("_last_hard_state", host->GetLastHardState());
}
fields->Set("_hostname", host->GetName());
fields->Set("_type", "CHECK RESULT");
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
fields->Set("_reachable", checkable->IsReachable());
CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
if (checkCommand)
fields->Set("_check_command", checkCommand->GetName());
double ts = Utility::GetTime();
if (cr) {
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
ts = cr->GetExecutionEnd();
}
if (cr && GetEnableSendPerfdata()) {
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "GelfWriter")
<< "Ignoring invalid perfdata for checkable '"
<< checkable->GetName() << "' and command '"
<< checkCommand->GetName() << "' with value: " << val;
continue;
}
}
String escaped_key = pdv->GetLabel();
boost::replace_all(escaped_key, " ", "_");
boost::replace_all(escaped_key, ".", "_");
boost::replace_all(escaped_key, "\\", "_");
boost::algorithm::replace_all(escaped_key, "::", ".");
fields->Set("_" + escaped_key, pdv->GetValue());
if (!pdv->GetMin().IsEmpty())
fields->Set("_" + escaped_key + "_min", pdv->GetMin());
if (!pdv->GetMax().IsEmpty())
fields->Set("_" + escaped_key + "_max", pdv->GetMax());
if (!pdv->GetWarn().IsEmpty())
fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
if (!pdv->GetCrit().IsEmpty())
fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
if (!pdv->GetUnit().IsEmpty())
fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
}
}
}
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
}
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
});
}
void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
String authorComment = "";
if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
authorComment = author + ";" + commentText;
}
String output;
double ts = Utility::GetTime();
if (cr) {
output = CompatUtility::GetCheckResultOutput(cr);
ts = cr->GetExecutionEnd();
}
Dictionary::Ptr fields = new Dictionary();
if (service) {
fields->Set("_type", "SERVICE NOTIFICATION");
//TODO: fix this to _service_name
fields->Set("_service", service->GetShortName());
fields->Set("short_message", output);
} else {
fields->Set("_type", "HOST NOTIFICATION");
fields->Set("short_message", output);
}
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
fields->Set("_hostname", host->GetName());
fields->Set("_command", commandName);
fields->Set("_notification_type", notificationTypeString);
fields->Set("_comment", authorComment);
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("_check_command", commandObj->GetName());
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
}
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
}
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
Log(LogDebug, "GelfWriter")
<< "Processing state change for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
fields->Set("_type", "STATE CHANGE");
fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
fields->Set("_hostname", host->GetName());
if (service) {
fields->Set("_service_name", service->GetShortName());
fields->Set("_service_state", Service::StateToString(service->GetState()));
fields->Set("_last_state", service->GetLastState());
fields->Set("_last_hard_state", service->GetLastHardState());
} else {
fields->Set("_last_state", host->GetLastState());
fields->Set("_last_hard_state", host->GetLastHardState());
}
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("_check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
fields->Set("_check_source", cr->GetCheckSource());
ts = cr->GetExecutionEnd();
}
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
}
String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
{
fields->Set("version", "1.1");
fields->Set("host", source);
fields->Set("timestamp", ts);
return JsonEncode(fields);
}
void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
{
std::ostringstream msgbuf;
msgbuf << gelfMessage;
msgbuf << '\0';
String log = msgbuf.str();
ObjectLock olock(this);
if (!GetConnected())
return;
try {
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
if (m_Stream.first) {
boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
m_Stream.first->flush();
} else {
boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
m_Stream.second->flush();
}
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
}