diff --git a/doc/14-features.md b/doc/14-features.md index 51584b44e..88ec89854 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -369,82 +369,6 @@ More integrations in development: * [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API. * [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs. -#### Logstash Writer - -[Logstash](https://www.elastic.co/products/logstash) receives -and processes event messages sent by Icinga 2 and the [LogstashWriter](9-object-types.md#objecttype-logstashwriter) -feature. As part of the Elastic Stack it allows you to -process and modify the messages and forward them to [Elasticsearch](https://www.elastic.co/products/elasticsearch) -as backed. - -Before proceeding with this integration guide please ensure -that you have Logstash, Elasticsearch and Kibana up and running -as part of the Elastic Stack. - -> **Note** -> -> The LogstashWriter feature has been tested with Elastic Stack 5.x and therefore Logstash 5.x. -> Older versions are not supported. - -Logstash supports `TCP` and `UDP` as input socket type. You must -further enable JSON support for input data processing. Logstash 5.x -comes without any pre-installed plugins and requires you to install -them separately. - -Example on CentOS 7 and UDP as socket type: - -``` -/usr/share/logstash/bin/logstash-plugin install logstash-input-udp -/usr/share/logstash/bin/logstash-plugin install logstash-codec-json -``` - -Add the Icinga 2 input and set the output to your running Elasticsearch instance. -You do not need to reload Logstash since version 5.x supports configuration changes -without restart. - -This example uses port `5555`. You are allowed to use any available port (note it for later). - -``` -# vim /etc/logstash/conf.d/icinga2.conf - -input { - udp { - port => 5555 - codec => "json" - } -} -output { - elasticsearch { - hosts => [ "localhost:9200" ] - } -} -``` - -Modify the feature configuration and set the -socket type, host and port attributes. The port must be the same -as configured in your Logstash input, e.g. `5555`. - -``` -# vim /etc/icinga2/features-available/logstash.conf - -object LogstashWriter "logstash" { - host = "192.168.33.7" - port = 5555 - socket_type = "udp" -} -``` - -Enable the feature and restart Icinga 2. - -``` -# icinga2 feature enable logstash -# systemctl restart icinga2 -``` - -Open [Kibana](https://www.elastic.co/products/kibana) or your -favorite Elasticsearch frontend and visualize the messages received -from Icinga 2. - ### OpenTSDB Writer While there are some OpenTSDB collector scripts and daemons like tcollector available for diff --git a/doc/9-object-types.md b/doc/9-object-types.md index d7b53b444..1a8e1123e 100644 --- a/doc/9-object-types.md +++ b/doc/9-object-types.md @@ -1006,31 +1006,6 @@ Configuration Attributes: > > UNIX sockets are not supported on Windows. -## LogstashWriter - -Writes Icinga 2 event messages to [Logstash](14-features.md#logstash-writer). - -Example: - -``` -library "perfdata" - -object LogstashWriter "logstash" { - host = "192.168.33.7" - port = 5555 - socket_type = "udp" -} -``` - -Configuration Attributes: - - Name |Description - ----------------------|---------------------- - host |**Optional.** Logstash receiver host address. Defaults to `127.0.0.1`. - port |**Optional.** Logstash receiver port. Defaults to `9201`. - socket_type |**Optional.** Socket type. Can be either `udp` or `tcp`. Defaults to `udp`. - source |**Optional.** Source name for this instance. Defaults to `icinga2`. - ## Notification diff --git a/etc/icinga2/features-available/logstash.conf b/etc/icinga2/features-available/logstash.conf deleted file mode 100644 index 2c506ea7e..000000000 --- a/etc/icinga2/features-available/logstash.conf +++ /dev/null @@ -1,12 +0,0 @@ -/** - * The LogstashWriter type writes check result metrics and - * performance data to a TCP or UDP socket. - */ - -library "perfdata" - -object LogstashWriter "logstash" { - //host = "127.0.0.1" - //port = 9201 - //socket_type = "udp" -} diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index b80a7bc2b..858943f81 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -38,7 +38,7 @@ set(base_SOURCES perfdatavalue.cpp perfdatavalue.thpp scriptglobal.cpp scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp stacktrace.cpp statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp - sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp udpsocket.cpp threadpool.cpp timer.cpp + sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp threadpool.cpp timer.cpp tlsstream.cpp tlsutility.cpp type.cpp typetype-script.cpp unixsocket.cpp utility.cpp value.cpp value-operators.cpp workqueue.cpp ) diff --git a/lib/base/dictionary.cpp b/lib/base/dictionary.cpp index 1d55f557e..2594b850d 100644 --- a/lib/base/dictionary.cpp +++ b/lib/base/dictionary.cpp @@ -45,7 +45,6 @@ Value Dictionary::Get(const String& key) const return it->second; } - /** * Retrieves a value from a dictionary. * diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index ed9d4ebd8..026da03c3 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -29,7 +29,7 @@ #include #ifndef _WIN32 -#include +# include #endif /* _WIN32 */ using namespace icinga; @@ -38,21 +38,14 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET), m_SocketType(SOCK_STREAM), m_Protocol(IPPROTO_TCP) -{ } - -/** - * Constructor for the Socket class. - */ -Socket::Socket(int socketType, int protocol) - : m_FD(INVALID_SOCKET), m_SocketType(socketType), m_Protocol(protocol) + : m_FD(INVALID_SOCKET) { } /** * Constructor for the Socket class. */ Socket::Socket(SOCKET fd) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET) { SetFD(fd); } @@ -337,9 +330,6 @@ size_t Socket::Read(void *buffer, size_t count) */ Socket::Ptr Socket::Accept(void) { - if (m_Protocol == IPPROTO_UDP) - BOOST_THROW_EXCEPTION(std::runtime_error("Accept cannot be used for UDP sockets.")); - int fd; sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -433,85 +423,3 @@ void Socket::SocketPair(SOCKET s[2]) << boost::errinfo_errno(errno)); } -/** - * Creates a socket and connects to the specified node and service. - * - * @param node The node. - * @param service The service. - */ -void Socket::Connect(const String& node, const String& service) -{ - addrinfo hints; - addrinfo *result; - int error; - const char *func; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = m_SocketType; - hints.ai_protocol = m_Protocol; - - int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); - - if (rc != 0) { - Log(LogCritical, "Socket") - << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; - - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("getaddrinfo") - << errinfo_getaddrinfo_error(rc)); - } - - int fd = INVALID_SOCKET; - - for (addrinfo *info = result; info != NULL; info = info->ai_next) { - fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - - if (fd == INVALID_SOCKET) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "socket"; - - continue; - } - - rc = connect(fd, info->ai_addr, info->ai_addrlen); - - if (rc < 0) { -#ifdef _WIN32 - error = WSAGetLastError(); -#else /* _WIN32 */ - error = errno; -#endif /* _WIN32 */ - func = "connect"; - - closesocket(fd); - - continue; - } - - SetFD(fd); - - break; - } - - freeaddrinfo(result); - - if (GetFD() == INVALID_SOCKET) { - Log(LogCritical, "Socket") - << "Invalid socket: " << Utility::FormatErrorNumber(error); - -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << boost::errinfo_errno(error)); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function(func) - << errinfo_win32_error(error)); -#endif /* _WIN32 */ - } -} diff --git a/lib/base/socket.hpp b/lib/base/socket.hpp index 107b2a276..12834233b 100644 --- a/lib/base/socket.hpp +++ b/lib/base/socket.hpp @@ -51,7 +51,6 @@ public: size_t Write(const void *buffer, size_t size); void Listen(void); - void Connect(const String& node, const String& service); Socket::Ptr Accept(void); bool Poll(bool read, bool write, struct timeval *timeout = NULL); @@ -61,17 +60,14 @@ public: static void SocketPair(SOCKET s[2]); protected: - Socket(int socketType, int protocol); - void SetFD(SOCKET fd); int GetError(void) const; + mutable boost::mutex m_SocketMutex; private: SOCKET m_FD; /**< The socket descriptor. */ - int m_SocketType; - int m_Protocol; static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); }; diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index a725f2520..734f0878d 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -27,13 +27,6 @@ using namespace icinga; -/** - * Constructor for the TcpSocket class. - */ -TcpSocket::TcpSocket(void) - : Socket(SOCK_STREAM, IPPROTO_TCP) -{ } - /** * Creates a socket and binds it to the specified service. * @@ -138,3 +131,97 @@ void TcpSocket::Bind(const String& node, const String& service, int family) #endif /* _WIN32 */ } } + +/** + * Creates a socket and connects to the specified node and service. + * + * @param node The node. + * @param service The service. + */ +void TcpSocket::Connect(const String& node, const String& service) +{ + addrinfo hints; + addrinfo *result; + int error; + const char *func; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); + + if (rc != 0) { + Log(LogCritical, "TcpSocket") + << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; + + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("getaddrinfo") + << errinfo_getaddrinfo_error(rc)); + } + + int fd = INVALID_SOCKET; + + for (addrinfo *info = result; info != NULL; info = info->ai_next) { + fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + + if (fd == INVALID_SOCKET) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "socket"; + + continue; + } + + const int optTrue = 1; + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast(&optTrue), sizeof(optTrue)) != 0) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + Log(LogWarning, "TcpSocket") + << "setsockopt() unable to enable TCP keep-alives with error code " << rc; + } + + rc = connect(fd, info->ai_addr, info->ai_addrlen); + + if (rc < 0) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "connect"; + + closesocket(fd); + + continue; + } + + SetFD(fd); + + break; + } + + freeaddrinfo(result); + + if (GetFD() == INVALID_SOCKET) { + Log(LogCritical, "TcpSocket") + << "Invalid socket: " << Utility::FormatErrorNumber(error); + +#ifndef _WIN32 + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << boost::errinfo_errno(error)); +#else /* _WIN32 */ + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << errinfo_win32_error(error)); +#endif /* _WIN32 */ + } +} diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 24cf1b67e..53769c92f 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -36,10 +36,10 @@ class I2_BASE_API TcpSocket : public Socket public: DECLARE_PTR_TYPEDEFS(TcpSocket); - TcpSocket(void); - void Bind(const String& service, int family); void Bind(const String& node, const String& service, int family); + + void Connect(const String& node, const String& service); }; } diff --git a/lib/base/udpsocket.cpp b/lib/base/udpsocket.cpp deleted file mode 100644 index c61f4a5ce..000000000 --- a/lib/base/udpsocket.cpp +++ /dev/null @@ -1,36 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "base/udpsocket.hpp" -#include "base/logger.hpp" -#include "base/utility.hpp" -#include "base/exception.hpp" -#include -#include -#include - -using namespace icinga; - -/** - * Constructor for the UdpSocket class. - */ -UdpSocket::UdpSocket(void) - : Socket(SOCK_DGRAM, IPPROTO_UDP) -{ } - diff --git a/lib/base/udpsocket.hpp b/lib/base/udpsocket.hpp deleted file mode 100644 index 815fbaba5..000000000 --- a/lib/base/udpsocket.hpp +++ /dev/null @@ -1,45 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef UDPSOCKET_H -#define UDPSOCKET_H - -#include "base/i2-base.hpp" -#include "base/socket.hpp" - -namespace icinga -{ - -/** - * A UDP socket. - * - * @ingroup base - */ -class I2_BASE_API UdpSocket : public Socket -{ -public: - DECLARE_PTR_TYPEDEFS(UdpSocket); - - UdpSocket(void); -}; - -} - -#endif /* UDPSOCKET_H */ - diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index f4d86f965..08920dd3d 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -17,13 +17,12 @@ mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp) mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp) -mkclass_target(logstashwriter.ti logstashwriter.tcpp logstashwriter.thpp) mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp) mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp) mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp) set(perfdata_SOURCES - gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp logstashwriter.cpp logstashwriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp + gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp ) if(ICINGA2_UNITY_BUILD) @@ -52,11 +51,6 @@ install_if_not_exists( ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available ) -install_if_not_exists( - ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/logstash.conf - ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available -) - install_if_not_exists( ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index 0b613fb22..26bb1c426 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -64,4 +64,3 @@ private: } #endif /* GELFWRITER_H */ - diff --git a/lib/perfdata/logstashwriter.cpp b/lib/perfdata/logstashwriter.cpp index dde0bd678..523c9ef46 100644 --- a/lib/perfdata/logstashwriter.cpp +++ b/lib/perfdata/logstashwriter.cpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -22,6 +22,7 @@ #include "icinga/service.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" +#include "icinga/perfdatavalue.hpp" #include "icinga/notification.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -30,6 +31,7 @@ #include "base/perfdatavalue.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" + #include "base/json.hpp" #include "base/context.hpp" #include @@ -40,135 +42,129 @@ using namespace icinga; REGISTER_TYPE(LogstashWriter); + void LogstashWriter::Start(bool runtimeCreated) { - ObjectImpl::Start(runtimeCreated); + ObjectImpl::Start(runtimeCreated); - m_ReconnectTimer = new Timer(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); - // Send check results - Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); - // Send notifications - Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); - // Send state change - Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); + // Send check results + Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); + // Send notifications + Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); + // Send state change + Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); } + void LogstashWriter::ReconnectTimerHandler(void) { if (m_Stream) return; - Socket::Ptr socket; - - if (GetSocketType() == "tcp") + if(GetDefaultProtocol() == true) socket = new TcpSocket(); else socket = new UdpSocket(); - Log(LogNotice, "LogstashWriter") - << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogNotice, "LogstashWriter") + << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogCritical, "LogstashWriter") - << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - return; - } + try { + socket->Connect(GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogCritical, "LogstashWriter") + << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + return; + } - m_Stream = new NetworkStream(socket); + m_Stream = new NetworkStream(socket); } void LogstashWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { - CONTEXT("LOGSTASH Processing check result for '" + checkable->GetName() + "'"); + CONTEXT("LOGSTASGH Processing check result for '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing check result for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter")<< "Logstash Processing check result for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); - } + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } - fields->Set("host_name", host->GetName()); - fields->Set("type", "CheckResult"); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("hostname", host->GetName()); + fields->Set("type", "CHECK RESULT"); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("latency", cr->CalculateLatency()); - fields->Set("execution_time", cr->CalculateExecutionTime()); - fields->Set("reachable", checkable->IsReachable()); + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + fields->Set("reachable", checkable->IsReachable()); - double ts = Utility::GetTime(); + if (cr) { + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); + fields->Set("check_source", cr->GetCheckSource()); + } - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - fields->Set("check_source", cr->GetCheckSource()); - ts = cr->GetExecutionEnd(); - } + if (GetEnableSendPerfdata()) { + Array::Ptr perfdata = cr->GetPerformanceData(); - Array::Ptr perfdata = cr->GetPerformanceData(); + if (perfdata) { + ObjectLock olock(perfdata); + BOOST_FOREACH(const Value& val, perfdata) { + PerfdataValue::Ptr pdv; - if (perfdata) { - Dictionary::Ptr perfdataItems = new Dictionary(); + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + String escaped_key = pdv->GetLabel(); + boost::replace_all(escaped_key, " ", "_"); + boost::replace_all(escaped_key, ".", "_"); + boost::replace_all(escaped_key, "\\", "_"); + boost::algorithm::replace_all(escaped_key, "::", "."); + fields->Set(escaped_key, pdv->GetValue()); - ObjectLock olock(perfdata); - for (const Value& val : perfdata) { - PerfdataValue::Ptr pdv; + if (pdv->GetMin()) + fields->Set(escaped_key + "_min", pdv->GetMin()); + if (pdv->GetMax()) + fields->Set(escaped_key + "_max", pdv->GetMax()); + if (pdv->GetWarn()) + fields->Set(escaped_key + "_warn", pdv->GetWarn()); + if (pdv->GetCrit()) + fields->Set(escaped_key + "_crit", pdv->GetCrit()); + } catch (const std::exception&) { + Log(LogWarning, "LogstashWriter") + << "Ignoring invalid perfdata value: '" << val << "' for object '" + << checkable-GetName() << "'."; + } + } + } + } + } - if (val.IsObjectType()) - pdv = val; - else { - try { - pdv = PerfdataValue::Parse(val); - } catch (const std::exception&) { - Log(LogWarning, "LogstashWriter") - << "Ignoring invalid perfdata value: '" << val << "' for object '" - << checkable->GetName() << "'."; - continue; - } - } - - Dictionary::Ptr perfdataItem = new Dictionary(); - perfdataItem->Set("value", pdv->GetValue()); - - if (pdv->GetMin()) - perfdataItem->Set("min", pdv->GetMin()); - if (pdv->GetMax()) - perfdataItem->Set("max", pdv->GetMax()); - if (pdv->GetWarn()) - perfdataItem->Set("warn", pdv->GetWarn()); - if (pdv->GetCrit()) - perfdataItem->Set("crit", pdv->GetCrit()); - - String escaped_key = EscapeMetricLabel(pdv->GetLabel()); - - perfdataItems->Set(escaped_key, perfdataItem); - } - - fields->Set("performance_data", perfdataItems); - } - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } @@ -176,87 +172,85 @@ void LogstashWriter::NotificationToUserHandler(const Notification::Ptr& notifica const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, const String& author, const String& comment_text, const String& command_name) { - CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing notification for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Logstash Processing notification for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - String notification_type_str = Notification::NotificationTypeToString(notification_type); + String notification_type_str = Notification::NotificationTypeToString(notification_type); - String author_comment = ""; + String author_comment = ""; - if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { - author_comment = author + ";" + comment_text; - } + if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { + author_comment = author + ";" + comment_text; + } - double ts = Utility::GetTime(); + String output; + if (cr) + output = CompatUtility::GetCheckResultOutput(cr); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("type", "SERVICE NOTIFICATION"); - fields->Set("service_name", service->GetShortName()); - } else { - fields->Set("type", "HOST NOTIFICATION"); - } + if (service) { + fields->Set("type", "SERVICE NOTIFICATION"); + fields->Set("service", service->GetShortName()); + fields->Set("short_message", output); + } else { + fields->Set("type", "HOST NOTIFICATION"); + fields->Set("short_message", CompatUtility::GetHostStateString(host)+ ")"); + } - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - ts = cr->GetExecutionEnd(); - } + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("hostname", host->GetName()); + fields->Set("command", command_name); + fields->Set("notification_type", notification_type_str); + fields->Set("comment", author_comment); - fields->Set("host_name", host->GetName()); - fields->Set("command", command_name); - fields->Set("notification_type", notification_type_str); - fields->Set("comment", author_comment); - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } void LogstashWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { - CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing state change for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Logstash Processing state change for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("type", "StateChange"); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("hostname", host->GetName()); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("type", "STATE CHANGE"); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("hostname", host->GetName()); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); } - double ts = Utility::GetTime(); - - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - fields->Set("check_source", cr->GetCheckSource()); - ts = cr->GetExecutionEnd(); - } - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + if (cr) { + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); + fields->Set("check_source", cr->GetCheckSource()); + } + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts) @@ -264,8 +258,8 @@ String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, con fields->Set("version", "1.1"); fields->Set("host", source); fields->Set("timestamp", ts); - - return JsonEncode(fields) + "\n"; + String logstashObj= JsonEncode(fields); + return logstashObj+ "\n"; } void LogstashWriter::SendLogMessage(const String& message) @@ -276,32 +270,12 @@ void LogstashWriter::SendLogMessage(const String& message) return; try { - m_Stream->Write(&message[0], message.GetLength()); + m_Stream->Write(&message[0], message.GetLength()); } catch (const std::exception& ex) { - Log(LogCritical, "LogstashWriter") - << "Cannot write to " << GetSocketType() - << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogCritical, "LogstashWriter") << "Cannot write to " << + ((GetDefaultProtocol()==true) ? "tcp" : "udp") << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; m_Stream.reset(); } } -String LogstashWriter::EscapeMetricLabel(const String& str) -{ - String result = str; - - boost::replace_all(result, " ", "_"); - boost::replace_all(result, ".", "_"); - boost::replace_all(result, "\\", "_"); - boost::replace_all(result, "::", "."); - - return result; -} - -void LogstashWriter::ValidateSocketType(const String& value, const ValidationUtils& utils) -{ - ObjectImpl::ValidateSocketType(value, utils); - - if (value != "udp" && value != "tcp") - BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("socket_type"), "Socket type '" + value + "' is invalid.")); -} diff --git a/lib/perfdata/logstashwriter.hpp b/lib/perfdata/logstashwriter.hpp deleted file mode 100644 index 2487b29e9..000000000 --- a/lib/perfdata/logstashwriter.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef LOGSTASHWRITER_H -#define LOGSTASHWRITER_H - -#include "perfdata/logstashwriter.thpp" -#include "icinga/service.hpp" -#include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/udpsocket.hpp" -#include "base/timer.hpp" -#include -#include - -namespace icinga -{ - -/** - * An Icinga logstash writer. - * - * @ingroup perfdata - */ -class LogstashWriter : public ObjectImpl -{ - -public: - DECLARE_OBJECT(LogstashWriter); - DECLARE_OBJECTNAME(LogstashWriter); - - virtual void ValidateSocketType(const String& value, const ValidationUtils& utils) override; - -protected: - virtual void Start(bool runtimeCreated) override; - -private: - Stream::Ptr m_Stream; - - Timer::Ptr m_ReconnectTimer; - - void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, - const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, - const String& author, const String& comment_text, const String& command_name); - void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); - void SendLogMessage(const String& message); - String ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts); - - static String EscapeMetricLabel(const String& str); - - void ReconnectTimerHandler(void); -}; - -} - -#endif /* LOGSTASHWRITER_H */ diff --git a/lib/perfdata/logstashwriter.ti b/lib/perfdata/logstashwriter.ti deleted file mode 100644 index b64bcf8db..000000000 --- a/lib/perfdata/logstashwriter.ti +++ /dev/null @@ -1,46 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "base/configobject.hpp" - -library perfdata; - -namespace icinga -{ - -class LogstashWriter : ConfigObject -{ - [config] String host { - default {{{ return "127.0.0.1"; }}} - }; - - [config] String port { - default {{{ return "9201"; }}} - }; - - [config] String socket_type { - default {{{ return "udp"; }}} - }; - - [config] String source { - default {{{ return "icinga2"; }}} - }; -}; - -}