From bd5ff814f23a11093832e59f0716ff151f68eea5 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Tue, 24 Jan 2017 12:10:37 +0100 Subject: [PATCH] Review LogstashWriter feature implementation refs #4054 --- etc/icinga2/features-available/logstash.conf | 3 +- lib/base/socket.cpp | 27 +- lib/base/socket.hpp | 11 +- lib/base/tcpsocket.cpp | 104 +----- lib/base/tcpsocket.hpp | 5 +- lib/base/udpsocket.cpp | 11 +- lib/base/udpsocket.hpp | 8 +- lib/perfdata/logstashwriter.cpp | 334 ++++++++++--------- lib/perfdata/logstashwriter.hpp | 31 +- lib/perfdata/logstashwriter.ti | 10 +- 10 files changed, 244 insertions(+), 300 deletions(-) diff --git a/etc/icinga2/features-available/logstash.conf b/etc/icinga2/features-available/logstash.conf index 6a08f16fc..2c506ea7e 100644 --- a/etc/icinga2/features-available/logstash.conf +++ b/etc/icinga2/features-available/logstash.conf @@ -8,6 +8,5 @@ library "perfdata" object LogstashWriter "logstash" { //host = "127.0.0.1" //port = 9201 - /* default is tcp */ - //defaultProtocol = true + //socket_type = "udp" } diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index 2016cb76d..ed9d4ebd8 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -38,14 +38,21 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET), m_SocketType(SOCK_STREAM), m_Protocol(IPPROTO_TCP) +{ } + +/** + * Constructor for the Socket class. + */ +Socket::Socket(int socketType, int protocol) + : m_FD(INVALID_SOCKET), m_SocketType(socketType), m_Protocol(protocol) { } /** * Constructor for the Socket class. */ Socket::Socket(SOCKET fd) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET) { SetFD(fd); } @@ -330,6 +337,9 @@ size_t Socket::Read(void *buffer, size_t count) */ Socket::Ptr Socket::Accept(void) { + if (m_Protocol == IPPROTO_UDP) + BOOST_THROW_EXCEPTION(std::runtime_error("Accept cannot be used for UDP sockets.")); + int fd; sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -428,7 +438,6 @@ void Socket::SocketPair(SOCKET s[2]) * * @param node The node. * @param service The service. - * @param protocol The protocol */ void Socket::Connect(const String& node, const String& service) { @@ -436,16 +445,16 @@ void Socket::Connect(const String& node, const String& service) addrinfo *result; int error; const char *func; - - SocketType(); + memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; - hints.ai_socktype = socktype; - hints.ai_protocol = protocol; + hints.ai_socktype = m_SocketType; + hints.ai_protocol = m_Protocol; + int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); if (rc != 0) { - Log(LogCritical, protocol+"Socket") + Log(LogCritical, "Socket") << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; BOOST_THROW_EXCEPTION(socket_error() @@ -492,7 +501,7 @@ void Socket::Connect(const String& node, const String& service) freeaddrinfo(result); if (GetFD() == INVALID_SOCKET) { - Log(LogCritical, "UdpSocket") + Log(LogCritical, "Socket") << "Invalid socket: " << Utility::FormatErrorNumber(error); #ifndef _WIN32 diff --git a/lib/base/socket.hpp b/lib/base/socket.hpp index df5bff122..107b2a276 100644 --- a/lib/base/socket.hpp +++ b/lib/base/socket.hpp @@ -59,20 +59,19 @@ public: void MakeNonBlocking(void); static void SocketPair(SOCKET s[2]); - + protected: + Socket(int socketType, int protocol); + void SetFD(SOCKET fd); int GetError(void) const; - int socktype; - int protocol; - - virtual void SocketType(){}; - mutable boost::mutex m_SocketMutex; private: SOCKET m_FD; /**< The socket descriptor. */ + int m_SocketType; + int m_Protocol; static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); }; diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index 22ff6076c..a725f2520 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -27,10 +27,12 @@ using namespace icinga; -void TcpSocket::SocketType(){ - socktype = SOCK_STREAM; - protocol = IPPROTO_TCP; -} +/** + * Constructor for the TcpSocket class. + */ +TcpSocket::TcpSocket(void) + : Socket(SOCK_STREAM, IPPROTO_TCP) +{ } /** * Creates a socket and binds it to the specified service. @@ -136,97 +138,3 @@ void TcpSocket::Bind(const String& node, const String& service, int family) #endif /* _WIN32 */ } } - -/** - * Creates a socket and connects to the specified node and service. - * - * @param node The node. - * @param service The service. - */ -void TcpSocket::Connect(const String& node, const String& service) -{ - addrinfo hints; - addrinfo *result; - int error; - const char *func; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); - - if (rc != 0) { - Log(LogCritical, "TcpSocket") - << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; - - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("getaddrinfo") - << errinfo_getaddrinfo_error(rc)); - } - - int fd = INVALID_SOCKET; - - for (addrinfo *info = result; info != NULL; info = info->ai_next) { - fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - - if (fd == INVALID_SOCKET) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "socket"; - - continue; - } - - const int optTrue = 1; - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast(&optTrue), sizeof(optTrue)) != 0) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - Log(LogWarning, "TcpSocket") - << "setsockopt() unable to enable TCP keep-alives with error code " << rc; - } - - rc = connect(fd, info->ai_addr, info->ai_addrlen); - - if (rc < 0) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "connect"; - - closesocket(fd); - - continue; - } - - SetFD(fd); - - break; - } - - freeaddrinfo(result); - - if (GetFD() == INVALID_SOCKET) { - Log(LogCritical, "TcpSocket") - << "Invalid socket: " << Utility::FormatErrorNumber(error); - -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << boost::errinfo_errno(error)); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << errinfo_win32_error(error)); -#endif /* _WIN32 */ - } -} diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 3f4a74a7b..24cf1b67e 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -36,11 +36,10 @@ class I2_BASE_API TcpSocket : public Socket public: DECLARE_PTR_TYPEDEFS(TcpSocket); + TcpSocket(void); + void Bind(const String& service, int family); void Bind(const String& node, const String& service, int family); - -private: - void SocketType(); }; } diff --git a/lib/base/udpsocket.cpp b/lib/base/udpsocket.cpp index 2b219c992..c61f4a5ce 100644 --- a/lib/base/udpsocket.cpp +++ b/lib/base/udpsocket.cpp @@ -27,7 +27,10 @@ using namespace icinga; -void UdpSocket::SocketType(){ - socktype = SOCK_DGRAM; - protocol = IPPROTO_UDP; -} +/** + * Constructor for the UdpSocket class. + */ +UdpSocket::UdpSocket(void) + : Socket(SOCK_DGRAM, IPPROTO_UDP) +{ } + diff --git a/lib/base/udpsocket.hpp b/lib/base/udpsocket.hpp index d3b8fc275..815fbaba5 100644 --- a/lib/base/udpsocket.hpp +++ b/lib/base/udpsocket.hpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -34,11 +34,11 @@ namespace icinga class I2_BASE_API UdpSocket : public Socket { public: - DECLARE_PTR_TYPEDEFS(UdpSocket); + DECLARE_PTR_TYPEDEFS(UdpSocket); -private: - void SocketType(); + UdpSocket(void); }; + } #endif /* UDPSOCKET_H */ diff --git a/lib/perfdata/logstashwriter.cpp b/lib/perfdata/logstashwriter.cpp index def51d0a3..9c715298c 100644 --- a/lib/perfdata/logstashwriter.cpp +++ b/lib/perfdata/logstashwriter.cpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -23,7 +23,6 @@ #include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" #include "icinga/perfdatavalue.hpp" - #include "icinga/notification.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -31,7 +30,6 @@ #include "base/utility.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" - #include "base/json.hpp" #include "base/context.hpp" #include @@ -42,129 +40,135 @@ using namespace icinga; REGISTER_TYPE(LogstashWriter); - void LogstashWriter::Start(bool runtimeCreated) { - ObjectImpl::Start(runtimeCreated); + ObjectImpl::Start(runtimeCreated); - m_ReconnectTimer = new Timer(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); - // Send check results - Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); - // Send notifications - Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); - // Send state change - Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); + // Send check results + Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); + // Send notifications + Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); + // Send state change + Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); } - void LogstashWriter::ReconnectTimerHandler(void) { if (m_Stream) return; + Socket::Ptr socket; - if(GetDefaultProtocol() == true) + + if (GetSocketType() == "tcp") socket = new TcpSocket(); else socket = new UdpSocket(); - Log(LogNotice, "LogstashWriter") - << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogNotice, "LogstashWriter") + << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogCritical, "LogstashWriter") - << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - return; - } + try { + socket->Connect(GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogCritical, "LogstashWriter") + << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + return; + } - m_Stream = new NetworkStream(socket); + m_Stream = new NetworkStream(socket); } void LogstashWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { - CONTEXT("LOGSTASGH Processing check result for '" + checkable->GetName() + "'"); + CONTEXT("LOGSTASH Processing check result for '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter")<< "Logstash Processing check result for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Processing check result for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - double ts = cr->GetExecutionEnd(); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); - } + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } - fields->Set("hostname", host->GetName()); - fields->Set("type", "CHECK RESULT"); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("host_name", host->GetName()); + fields->Set("type", "CheckResult"); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("latency", cr->CalculateLatency()); - fields->Set("execution_time", cr->CalculateExecutionTime()); - fields->Set("reachable", checkable->IsReachable()); + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + fields->Set("reachable", checkable->IsReachable()); - if (cr) { - fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); - fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); - fields->Set("check_source", cr->GetCheckSource()); - } + double ts = Utility::GetTime(); - if (GetEnableSendPerfdata()) { - Array::Ptr perfdata = cr->GetPerformanceData(); + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + fields->Set("check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } - if (perfdata) { - ObjectLock olock(perfdata); - BOOST_FOREACH(const Value& val, perfdata) { - PerfdataValue::Ptr pdv; + Array::Ptr perfdata = cr->GetPerformanceData(); - if (val.IsObjectType()) - pdv = val; - else { - try { - pdv = PerfdataValue::Parse(val); - String escaped_key = pdv->GetLabel(); - boost::replace_all(escaped_key, " ", "_"); - boost::replace_all(escaped_key, ".", "_"); - boost::replace_all(escaped_key, "\\", "_"); - boost::algorithm::replace_all(escaped_key, "::", "."); - fields->Set(escaped_key, pdv->GetValue()); + if (perfdata) { + Dictionary::Ptr perfdataItems = new Dictionary(); - if (pdv->GetMin()) - fields->Set(escaped_key + "_min", pdv->GetMin()); - if (pdv->GetMax()) - fields->Set(escaped_key + "_max", pdv->GetMax()); - if (pdv->GetWarn()) - fields->Set(escaped_key + "_warn", pdv->GetWarn()); - if (pdv->GetCrit()) - fields->Set(escaped_key + "_crit", pdv->GetCrit()); - } catch (const std::exception&) { - Log(LogWarning, "LogstashWriter") - << "Ignoring invalid perfdata value: '" << val << "' for object '" - << checkable-GetName() << "'."; - } - } - } - } - } + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "LogstashWriter") + << "Ignoring invalid perfdata value: '" << val << "' for object '" + << checkable->GetName() << "'."; + continue; + } + } + + Dictionary::Ptr perfdataItem = new Dictionary(); + perfdataItem->Set("value", pdv->GetValue()); + + if (pdv->GetMin()) + perfdataItem->Set("min", pdv->GetMin()); + if (pdv->GetMax()) + perfdataItem->Set("max", pdv->GetMax()); + if (pdv->GetWarn()) + perfdataItem->Set("warn", pdv->GetWarn()); + if (pdv->GetCrit()) + perfdataItem->Set("crit", pdv->GetCrit()); + + String escaped_key = EscapeMetricLabel(pdv->GetLabel()); + + perfdataItems->Set(escaped_key, perfdataItem); + } + + fields->Set("performance_data", perfdataItems); + } + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } @@ -172,85 +176,87 @@ void LogstashWriter::NotificationToUserHandler(const Notification::Ptr& notifica const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, const String& author, const String& comment_text, const String& command_name) { - CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Logstash Processing notification for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Processing notification for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - double ts = cr->GetExecutionEnd(); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); - String notification_type_str = Notification::NotificationTypeToString(notification_type); + String notification_type_str = Notification::NotificationTypeToString(notification_type); - String author_comment = ""; + String author_comment = ""; - if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { - author_comment = author + ";" + comment_text; - } + if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { + author_comment = author + ";" + comment_text; + } - String output; - if (cr) - output = CompatUtility::GetCheckResultOutput(cr); + double ts = Utility::GetTime(); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("type", "SERVICE NOTIFICATION"); - fields->Set("service", service->GetShortName()); - fields->Set("short_message", output); - } else { - fields->Set("type", "HOST NOTIFICATION"); - fields->Set("short_message", CompatUtility::GetHostStateString(host)+ ")"); - } + if (service) { + fields->Set("type", "SERVICE NOTIFICATION"); + fields->Set("service_name", service->GetShortName()); + } else { + fields->Set("type", "HOST NOTIFICATION"); + } - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + ts = cr->GetExecutionEnd(); + } - fields->Set("hostname", host->GetName()); - fields->Set("command", command_name); - fields->Set("notification_type", notification_type_str); - fields->Set("comment", author_comment); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + fields->Set("host_name", host->GetName()); + fields->Set("command", command_name); + fields->Set("notification_type", notification_type_str); + fields->Set("comment", author_comment); + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } void LogstashWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { - CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Logstash Processing state change for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Processing state change for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - double ts = cr->GetExecutionEnd(); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("type", "STATE CHANGE"); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("hostname", host->GetName()); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("type", "StateChange"); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("hostname", host->GetName()); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); } - if (cr) { - fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); - fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); - fields->Set("check_source", cr->GetCheckSource()); - } - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + double ts = Utility::GetTime(); + + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + fields->Set("check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts) @@ -258,8 +264,8 @@ String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, con fields->Set("version", "1.1"); fields->Set("host", source); fields->Set("timestamp", ts); - String logstashObj= JsonEncode(fields); - return logstashObj+ "\n"; + + return JsonEncode(fields) + "\n"; } void LogstashWriter::SendLogMessage(const String& message) @@ -270,12 +276,32 @@ void LogstashWriter::SendLogMessage(const String& message) return; try { - m_Stream->Write(&message[0], message.GetLength()); + m_Stream->Write(&message[0], message.GetLength()); } catch (const std::exception& ex) { - Log(LogCritical, "LogstashWriter") << "Cannot write to " << - ((GetDefaultProtocol()==true) ? "tcp" : "udp") << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogCritical, "LogstashWriter") + << "Cannot write to " << GetSocketType() + << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; m_Stream.reset(); } } +String LogstashWriter::EscapeMetricLabel(const String& str) +{ + String result = str; + + boost::replace_all(result, " ", "_"); + boost::replace_all(result, ".", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, "::", "."); + + return result; +} + +void LogstashWriter::ValidateSocketType(const String& value, const ValidationUtils& utils) +{ + ObjectImpl::ValidateSocketType(value, utils); + + if (value != "udp" && value != "tcp") + BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("socket_type"), "Socket type '" + value + "' is invalid.")); +} diff --git a/lib/perfdata/logstashwriter.hpp b/lib/perfdata/logstashwriter.hpp index 65c46f51b..2487b29e9 100644 --- a/lib/perfdata/logstashwriter.hpp +++ b/lib/perfdata/logstashwriter.hpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -39,27 +39,32 @@ namespace icinga */ class LogstashWriter : public ObjectImpl { + public: - DECLARE_OBJECT(LogstashWriter); - DECLARE_OBJECTNAME(LogstashWriter); + DECLARE_OBJECT(LogstashWriter); + DECLARE_OBJECTNAME(LogstashWriter); + + virtual void ValidateSocketType(const String& value, const ValidationUtils& utils) override; protected: - virtual void Start(bool runtimeCreated) override; + virtual void Start(bool runtimeCreated) override; private: - Stream::Ptr m_Stream; + Stream::Ptr m_Stream; - Timer::Ptr m_ReconnectTimer; + Timer::Ptr m_ReconnectTimer; - void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, - const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, - const String& author, const String& comment_text, const String& command_name); - void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); - void SendLogMessage(const String& message); + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, + const String& author, const String& comment_text, const String& command_name); + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void SendLogMessage(const String& message); String ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts); - void ReconnectTimerHandler(void); + static String EscapeMetricLabel(const String& str); + + void ReconnectTimerHandler(void); }; } diff --git a/lib/perfdata/logstashwriter.ti b/lib/perfdata/logstashwriter.ti index 6d822c995..b64bcf8db 100644 --- a/lib/perfdata/logstashwriter.ti +++ b/lib/perfdata/logstashwriter.ti @@ -33,18 +33,14 @@ class LogstashWriter : ConfigObject [config] String port { default {{{ return "9201"; }}} }; - - [config] bool defaultProtocol { - default {{{ return "true"; }}} + + [config] String socket_type { + default {{{ return "udp"; }}} }; [config] String source { default {{{ return "icinga2"; }}} }; - - [config] bool enable_send_perfdata { - default {{{ return false; }}} - }; }; }