diff --git a/doc/14-features.md b/doc/14-features.md index eb6c231b3..6a7511bac 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -358,6 +358,82 @@ Currently these events are processed: * State changes * Notifications +### Logstash Writer + +[Logstash](https://www.elastic.co/products/logstash) receives +and processes event messages sent by Icinga 2 and the [LogstashWriter](9-object-types.md#objecttype-logstashwriter) +feature. As part of the Elastic Stack it allows you to +process and modify the messages and forward them to [Elasticsearch](https://www.elastic.co/products/elasticsearch) +as backed. + +Before proceeding with this integration guide please ensure +that you have Logstash, Elasticsearch and Kibana up and running +as part of the Elastic Stack. + +> **Note** +> +> The LogstashWriter feature has been tested with Elastic Stack 5.x and therefore Logstash 5.x. +> Older versions are not supported. + +Logstash supports `TCP` and `UDP` as input socket type. You must +further enable JSON support for input data processing. Logstash 5.x +comes without any pre-installed plugins and requires you to install +them separately. + +Example on CentOS 7 and UDP as socket type: + +``` +/usr/share/logstash/bin/logstash-plugin install logstash-input-udp +/usr/share/logstash/bin/logstash-plugin install logstash-input-json +``` + +Add the Icinga 2 input and set the output to your running Elasticsearch instance. +You do not need to reload Logstash since version 5.x supports configuration changes +without restart. + +This example uses port `5555`. You are allowed to use any available port (note it for later). + +``` +# vim /etc/logstash/conf.d/icinga2.conf + +input { + udp { + port => 5555 + codec => "json" + } +} +output { + elasticsearch { + hosts => [ "localhost:9200" ] + } +} +``` + +Modify the feature configuration and set the +socket type, host and port attributes. The port must be the same +as configured in your Logstash input, e.g. `5555`. + +``` +# vim /etc/icinga2/features-available/logstash.conf + +object LogstashWriter "logstash" { + host = "192.168.33.7" + port = 5555 + socket_type = "udp" +} +``` + +Enable the feature and restart Icinga 2. + +``` +# icinga2 feature enable logstash +# systemctl restart icinga2 +``` + +Open [Kibana](https://www.elastic.co/products/kibana) or your +favorite Elasticsearch frontend and visualize the messages received +from Icinga 2. + ### OpenTSDB Writer While there are some OpenTSDB collector scripts and daemons like tcollector available for diff --git a/doc/9-object-types.md b/doc/9-object-types.md index 671341e7e..61586c5ae 100644 --- a/doc/9-object-types.md +++ b/doc/9-object-types.md @@ -1009,6 +1009,31 @@ Configuration Attributes: > > UNIX sockets are not supported on Windows. +## LogstashWriter + +Writes Icinga 2 event messages to [Logstash](14-features.md#logstash-writer). + +Example: + +``` +library "perfdata" + +object LogstashWriter "logstash" { + host = "192.168.33.7" + port = 5555 + socket_type = "udp" +} +``` + +Configuration Attributes: + + Name |Description + ----------------------|---------------------- + host |**Optional.** Logstash receiver host address. Defaults to `127.0.0.1`. + port |**Optional.** Logstash receiver port. Defaults to `9201`. + socket_type |**Optional.** Socket type. Can be either `udp` or `tcp`. Defaults to `udp`. + source |**Optional.** Source name for this instance. Defaults to `icinga2`. + ## Notification diff --git a/etc/icinga2/features-available/logstash.conf b/etc/icinga2/features-available/logstash.conf new file mode 100644 index 000000000..2c506ea7e --- /dev/null +++ b/etc/icinga2/features-available/logstash.conf @@ -0,0 +1,12 @@ +/** + * The LogstashWriter type writes check result metrics and + * performance data to a TCP or UDP socket. + */ + +library "perfdata" + +object LogstashWriter "logstash" { + //host = "127.0.0.1" + //port = 9201 + //socket_type = "udp" +} diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 7f362f6cb..420e3204c 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -36,7 +36,7 @@ set(base_SOURCES function.cpp function.thpp function-script.cpp functionwrapper.cpp scriptglobal.cpp scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp stacktrace.cpp statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp - sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp threadpool.cpp timer.cpp + sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp udpsocket.cpp threadpool.cpp timer.cpp tlsstream.cpp tlsutility.cpp type.cpp typetype-script.cpp unixsocket.cpp utility.cpp value.cpp value-operators.cpp workqueue.cpp ) diff --git a/lib/base/dictionary.cpp b/lib/base/dictionary.cpp index 2594b850d..1d55f557e 100644 --- a/lib/base/dictionary.cpp +++ b/lib/base/dictionary.cpp @@ -45,6 +45,7 @@ Value Dictionary::Get(const String& key) const return it->second; } + /** * Retrieves a value from a dictionary. * diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index 026da03c3..ed9d4ebd8 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -29,7 +29,7 @@ #include #ifndef _WIN32 -# include +#include #endif /* _WIN32 */ using namespace icinga; @@ -38,14 +38,21 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET), m_SocketType(SOCK_STREAM), m_Protocol(IPPROTO_TCP) +{ } + +/** + * Constructor for the Socket class. + */ +Socket::Socket(int socketType, int protocol) + : m_FD(INVALID_SOCKET), m_SocketType(socketType), m_Protocol(protocol) { } /** * Constructor for the Socket class. */ Socket::Socket(SOCKET fd) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET) { SetFD(fd); } @@ -330,6 +337,9 @@ size_t Socket::Read(void *buffer, size_t count) */ Socket::Ptr Socket::Accept(void) { + if (m_Protocol == IPPROTO_UDP) + BOOST_THROW_EXCEPTION(std::runtime_error("Accept cannot be used for UDP sockets.")); + int fd; sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -423,3 +433,85 @@ void Socket::SocketPair(SOCKET s[2]) << boost::errinfo_errno(errno)); } +/** + * Creates a socket and connects to the specified node and service. + * + * @param node The node. + * @param service The service. + */ +void Socket::Connect(const String& node, const String& service) +{ + addrinfo hints; + addrinfo *result; + int error; + const char *func; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = m_SocketType; + hints.ai_protocol = m_Protocol; + + int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); + + if (rc != 0) { + Log(LogCritical, "Socket") + << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; + + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("getaddrinfo") + << errinfo_getaddrinfo_error(rc)); + } + + int fd = INVALID_SOCKET; + + for (addrinfo *info = result; info != NULL; info = info->ai_next) { + fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + + if (fd == INVALID_SOCKET) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "socket"; + + continue; + } + + rc = connect(fd, info->ai_addr, info->ai_addrlen); + + if (rc < 0) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "connect"; + + closesocket(fd); + + continue; + } + + SetFD(fd); + + break; + } + + freeaddrinfo(result); + + if (GetFD() == INVALID_SOCKET) { + Log(LogCritical, "Socket") + << "Invalid socket: " << Utility::FormatErrorNumber(error); + +#ifndef _WIN32 + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << boost::errinfo_errno(error)); +#else /* _WIN32 */ + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << errinfo_win32_error(error)); +#endif /* _WIN32 */ + } +} diff --git a/lib/base/socket.hpp b/lib/base/socket.hpp index 12834233b..107b2a276 100644 --- a/lib/base/socket.hpp +++ b/lib/base/socket.hpp @@ -51,6 +51,7 @@ public: size_t Write(const void *buffer, size_t size); void Listen(void); + void Connect(const String& node, const String& service); Socket::Ptr Accept(void); bool Poll(bool read, bool write, struct timeval *timeout = NULL); @@ -60,14 +61,17 @@ public: static void SocketPair(SOCKET s[2]); protected: + Socket(int socketType, int protocol); + void SetFD(SOCKET fd); int GetError(void) const; - mutable boost::mutex m_SocketMutex; private: SOCKET m_FD; /**< The socket descriptor. */ + int m_SocketType; + int m_Protocol; static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); }; diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index 734f0878d..a725f2520 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -27,6 +27,13 @@ using namespace icinga; +/** + * Constructor for the TcpSocket class. + */ +TcpSocket::TcpSocket(void) + : Socket(SOCK_STREAM, IPPROTO_TCP) +{ } + /** * Creates a socket and binds it to the specified service. * @@ -131,97 +138,3 @@ void TcpSocket::Bind(const String& node, const String& service, int family) #endif /* _WIN32 */ } } - -/** - * Creates a socket and connects to the specified node and service. - * - * @param node The node. - * @param service The service. - */ -void TcpSocket::Connect(const String& node, const String& service) -{ - addrinfo hints; - addrinfo *result; - int error; - const char *func; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); - - if (rc != 0) { - Log(LogCritical, "TcpSocket") - << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; - - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("getaddrinfo") - << errinfo_getaddrinfo_error(rc)); - } - - int fd = INVALID_SOCKET; - - for (addrinfo *info = result; info != NULL; info = info->ai_next) { - fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - - if (fd == INVALID_SOCKET) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "socket"; - - continue; - } - - const int optTrue = 1; - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast(&optTrue), sizeof(optTrue)) != 0) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - Log(LogWarning, "TcpSocket") - << "setsockopt() unable to enable TCP keep-alives with error code " << rc; - } - - rc = connect(fd, info->ai_addr, info->ai_addrlen); - - if (rc < 0) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "connect"; - - closesocket(fd); - - continue; - } - - SetFD(fd); - - break; - } - - freeaddrinfo(result); - - if (GetFD() == INVALID_SOCKET) { - Log(LogCritical, "TcpSocket") - << "Invalid socket: " << Utility::FormatErrorNumber(error); - -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << boost::errinfo_errno(error)); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << errinfo_win32_error(error)); -#endif /* _WIN32 */ - } -} diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 53769c92f..24cf1b67e 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -36,10 +36,10 @@ class I2_BASE_API TcpSocket : public Socket public: DECLARE_PTR_TYPEDEFS(TcpSocket); + TcpSocket(void); + void Bind(const String& service, int family); void Bind(const String& node, const String& service, int family); - - void Connect(const String& node, const String& service); }; } diff --git a/lib/base/udpsocket.cpp b/lib/base/udpsocket.cpp new file mode 100644 index 000000000..c61f4a5ce --- /dev/null +++ b/lib/base/udpsocket.cpp @@ -0,0 +1,36 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/udpsocket.hpp" +#include "base/logger.hpp" +#include "base/utility.hpp" +#include "base/exception.hpp" +#include +#include +#include + +using namespace icinga; + +/** + * Constructor for the UdpSocket class. + */ +UdpSocket::UdpSocket(void) + : Socket(SOCK_DGRAM, IPPROTO_UDP) +{ } + diff --git a/lib/base/udpsocket.hpp b/lib/base/udpsocket.hpp new file mode 100644 index 000000000..815fbaba5 --- /dev/null +++ b/lib/base/udpsocket.hpp @@ -0,0 +1,45 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef UDPSOCKET_H +#define UDPSOCKET_H + +#include "base/i2-base.hpp" +#include "base/socket.hpp" + +namespace icinga +{ + +/** + * A UDP socket. + * + * @ingroup base + */ +class I2_BASE_API UdpSocket : public Socket +{ +public: + DECLARE_PTR_TYPEDEFS(UdpSocket); + + UdpSocket(void); +}; + +} + +#endif /* UDPSOCKET_H */ + diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 08920dd3d..f4d86f965 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -17,12 +17,13 @@ mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp) mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp) +mkclass_target(logstashwriter.ti logstashwriter.tcpp logstashwriter.thpp) mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp) mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp) mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp) set(perfdata_SOURCES - gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp + gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp logstashwriter.cpp logstashwriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp ) if(ICINGA2_UNITY_BUILD) @@ -51,6 +52,11 @@ install_if_not_exists( ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available ) +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/logstash.conf + ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available +) + install_if_not_exists( ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index 26bb1c426..0b613fb22 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -64,3 +64,4 @@ private: } #endif /* GELFWRITER_H */ + diff --git a/lib/perfdata/logstashwriter.cpp b/lib/perfdata/logstashwriter.cpp new file mode 100644 index 000000000..9c715298c --- /dev/null +++ b/lib/perfdata/logstashwriter.cpp @@ -0,0 +1,307 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "perfdata/logstashwriter.hpp" +#include "perfdata/logstashwriter.tcpp" +#include "icinga/service.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/perfdatavalue.hpp" +#include "icinga/notification.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/utility.hpp" +#include "base/stream.hpp" +#include "base/networkstream.hpp" +#include "base/json.hpp" +#include "base/context.hpp" +#include +#include +#include + +using namespace icinga; + +REGISTER_TYPE(LogstashWriter); + +void LogstashWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + // Send check results + Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); + // Send notifications + Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); + // Send state change + Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); +} + +void LogstashWriter::ReconnectTimerHandler(void) +{ + if (m_Stream) + return; + + Socket::Ptr socket; + + if (GetSocketType() == "tcp") + socket = new TcpSocket(); + else + socket = new UdpSocket(); + + Log(LogNotice, "LogstashWriter") + << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + + try { + socket->Connect(GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogCritical, "LogstashWriter") + << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + return; + } + + m_Stream = new NetworkStream(socket); +} + +void LogstashWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + CONTEXT("LOGSTASH Processing check result for '" + checkable->GetName() + "'"); + + Log(LogDebug, "LogstashWriter") + << "Processing check result for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + fields->Set("host_name", host->GetName()); + fields->Set("type", "CheckResult"); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + fields->Set("reachable", checkable->IsReachable()); + + double ts = Utility::GetTime(); + + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + fields->Set("check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } + + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (perfdata) { + Dictionary::Ptr perfdataItems = new Dictionary(); + + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "LogstashWriter") + << "Ignoring invalid perfdata value: '" << val << "' for object '" + << checkable->GetName() << "'."; + continue; + } + } + + Dictionary::Ptr perfdataItem = new Dictionary(); + perfdataItem->Set("value", pdv->GetValue()); + + if (pdv->GetMin()) + perfdataItem->Set("min", pdv->GetMin()); + if (pdv->GetMax()) + perfdataItem->Set("max", pdv->GetMax()); + if (pdv->GetWarn()) + perfdataItem->Set("warn", pdv->GetWarn()); + if (pdv->GetCrit()) + perfdataItem->Set("crit", pdv->GetCrit()); + + String escaped_key = EscapeMetricLabel(pdv->GetLabel()); + + perfdataItems->Set(escaped_key, perfdataItem); + } + + fields->Set("performance_data", perfdataItems); + } + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); +} + + +void LogstashWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, + const String& author, const String& comment_text, const String& command_name) +{ + CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); + + Log(LogDebug, "LogstashWriter") + << "Processing notification for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + String notification_type_str = Notification::NotificationTypeToString(notification_type); + + String author_comment = ""; + + if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { + author_comment = author + ";" + comment_text; + } + + double ts = Utility::GetTime(); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("type", "SERVICE NOTIFICATION"); + fields->Set("service_name", service->GetShortName()); + } else { + fields->Set("type", "HOST NOTIFICATION"); + } + + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + ts = cr->GetExecutionEnd(); + } + + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + + fields->Set("host_name", host->GetName()); + fields->Set("command", command_name); + fields->Set("notification_type", notification_type_str); + fields->Set("comment", author_comment); + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); +} + +void LogstashWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); + + Log(LogDebug, "LogstashWriter") + << "Processing state change for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("type", "StateChange"); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("hostname", host->GetName()); + + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + double ts = Utility::GetTime(); + + if (cr) { + fields->Set("plugin_output", cr->GetOutput()); + fields->Set("check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } + + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); +} + +String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts) +{ + fields->Set("version", "1.1"); + fields->Set("host", source); + fields->Set("timestamp", ts); + + return JsonEncode(fields) + "\n"; +} + +void LogstashWriter::SendLogMessage(const String& message) +{ + ObjectLock olock(this); + + if (!m_Stream) + return; + + try { + m_Stream->Write(&message[0], message.GetLength()); + } catch (const std::exception& ex) { + Log(LogCritical, "LogstashWriter") + << "Cannot write to " << GetSocketType() + << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + + m_Stream.reset(); + } +} + +String LogstashWriter::EscapeMetricLabel(const String& str) +{ + String result = str; + + boost::replace_all(result, " ", "_"); + boost::replace_all(result, ".", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, "::", "."); + + return result; +} + +void LogstashWriter::ValidateSocketType(const String& value, const ValidationUtils& utils) +{ + ObjectImpl::ValidateSocketType(value, utils); + + if (value != "udp" && value != "tcp") + BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("socket_type"), "Socket type '" + value + "' is invalid.")); +} diff --git a/lib/perfdata/logstashwriter.hpp b/lib/perfdata/logstashwriter.hpp new file mode 100644 index 000000000..2487b29e9 --- /dev/null +++ b/lib/perfdata/logstashwriter.hpp @@ -0,0 +1,72 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef LOGSTASHWRITER_H +#define LOGSTASHWRITER_H + +#include "perfdata/logstashwriter.thpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/udpsocket.hpp" +#include "base/timer.hpp" +#include +#include + +namespace icinga +{ + +/** + * An Icinga logstash writer. + * + * @ingroup perfdata + */ +class LogstashWriter : public ObjectImpl +{ + +public: + DECLARE_OBJECT(LogstashWriter); + DECLARE_OBJECTNAME(LogstashWriter); + + virtual void ValidateSocketType(const String& value, const ValidationUtils& utils) override; + +protected: + virtual void Start(bool runtimeCreated) override; + +private: + Stream::Ptr m_Stream; + + Timer::Ptr m_ReconnectTimer; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, + const String& author, const String& comment_text, const String& command_name); + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void SendLogMessage(const String& message); + String ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts); + + static String EscapeMetricLabel(const String& str); + + void ReconnectTimerHandler(void); +}; + +} + +#endif /* LOGSTASHWRITER_H */ diff --git a/lib/perfdata/logstashwriter.ti b/lib/perfdata/logstashwriter.ti new file mode 100644 index 000000000..b64bcf8db --- /dev/null +++ b/lib/perfdata/logstashwriter.ti @@ -0,0 +1,46 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class LogstashWriter : ConfigObject +{ + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + + [config] String port { + default {{{ return "9201"; }}} + }; + + [config] String socket_type { + default {{{ return "udp"; }}} + }; + + [config] String source { + default {{{ return "icinga2"; }}} + }; +}; + +}