diff --git a/etc/icinga2/features-available/logstash.conf b/etc/icinga2/features-available/logstash.conf index 2c506ea7e..6a08f16fc 100644 --- a/etc/icinga2/features-available/logstash.conf +++ b/etc/icinga2/features-available/logstash.conf @@ -8,5 +8,6 @@ library "perfdata" object LogstashWriter "logstash" { //host = "127.0.0.1" //port = 9201 - //socket_type = "udp" + /* default is tcp */ + //defaultProtocol = true } diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index ed9d4ebd8..2016cb76d 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -38,21 +38,14 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET), m_SocketType(SOCK_STREAM), m_Protocol(IPPROTO_TCP) -{ } - -/** - * Constructor for the Socket class. - */ -Socket::Socket(int socketType, int protocol) - : m_FD(INVALID_SOCKET), m_SocketType(socketType), m_Protocol(protocol) + : m_FD(INVALID_SOCKET) { } /** * Constructor for the Socket class. */ Socket::Socket(SOCKET fd) - : m_FD(INVALID_SOCKET) + : m_FD(INVALID_SOCKET) { SetFD(fd); } @@ -337,9 +330,6 @@ size_t Socket::Read(void *buffer, size_t count) */ Socket::Ptr Socket::Accept(void) { - if (m_Protocol == IPPROTO_UDP) - BOOST_THROW_EXCEPTION(std::runtime_error("Accept cannot be used for UDP sockets.")); - int fd; sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -438,6 +428,7 @@ void Socket::SocketPair(SOCKET s[2]) * * @param node The node. * @param service The service. + * @param protocol The protocol */ void Socket::Connect(const String& node, const String& service) { @@ -445,16 +436,16 @@ void Socket::Connect(const String& node, const String& service) addrinfo *result; int error; const char *func; - + + SocketType(); memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; - hints.ai_socktype = m_SocketType; - hints.ai_protocol = m_Protocol; - + hints.ai_socktype = socktype; + hints.ai_protocol = protocol; int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); if (rc != 0) { - Log(LogCritical, "Socket") + Log(LogCritical, protocol+"Socket") << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; BOOST_THROW_EXCEPTION(socket_error() @@ -501,7 +492,7 @@ void Socket::Connect(const String& node, const String& service) freeaddrinfo(result); if (GetFD() == INVALID_SOCKET) { - Log(LogCritical, "Socket") + Log(LogCritical, "UdpSocket") << "Invalid socket: " << Utility::FormatErrorNumber(error); #ifndef _WIN32 diff --git a/lib/base/socket.hpp b/lib/base/socket.hpp index 107b2a276..df5bff122 100644 --- a/lib/base/socket.hpp +++ b/lib/base/socket.hpp @@ -59,19 +59,20 @@ public: void MakeNonBlocking(void); static void SocketPair(SOCKET s[2]); - + protected: - Socket(int socketType, int protocol); - void SetFD(SOCKET fd); int GetError(void) const; + int socktype; + int protocol; + + virtual void SocketType(){}; + mutable boost::mutex m_SocketMutex; private: SOCKET m_FD; /**< The socket descriptor. */ - int m_SocketType; - int m_Protocol; static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); }; diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index a725f2520..22ff6076c 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -27,12 +27,10 @@ using namespace icinga; -/** - * Constructor for the TcpSocket class. - */ -TcpSocket::TcpSocket(void) - : Socket(SOCK_STREAM, IPPROTO_TCP) -{ } +void TcpSocket::SocketType(){ + socktype = SOCK_STREAM; + protocol = IPPROTO_TCP; +} /** * Creates a socket and binds it to the specified service. @@ -138,3 +136,97 @@ void TcpSocket::Bind(const String& node, const String& service, int family) #endif /* _WIN32 */ } } + +/** + * Creates a socket and connects to the specified node and service. + * + * @param node The node. + * @param service The service. + */ +void TcpSocket::Connect(const String& node, const String& service) +{ + addrinfo hints; + addrinfo *result; + int error; + const char *func; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); + + if (rc != 0) { + Log(LogCritical, "TcpSocket") + << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\""; + + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("getaddrinfo") + << errinfo_getaddrinfo_error(rc)); + } + + int fd = INVALID_SOCKET; + + for (addrinfo *info = result; info != NULL; info = info->ai_next) { + fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + + if (fd == INVALID_SOCKET) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "socket"; + + continue; + } + + const int optTrue = 1; + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast(&optTrue), sizeof(optTrue)) != 0) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + Log(LogWarning, "TcpSocket") + << "setsockopt() unable to enable TCP keep-alives with error code " << rc; + } + + rc = connect(fd, info->ai_addr, info->ai_addrlen); + + if (rc < 0) { +#ifdef _WIN32 + error = WSAGetLastError(); +#else /* _WIN32 */ + error = errno; +#endif /* _WIN32 */ + func = "connect"; + + closesocket(fd); + + continue; + } + + SetFD(fd); + + break; + } + + freeaddrinfo(result); + + if (GetFD() == INVALID_SOCKET) { + Log(LogCritical, "TcpSocket") + << "Invalid socket: " << Utility::FormatErrorNumber(error); + +#ifndef _WIN32 + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << boost::errinfo_errno(error)); +#else /* _WIN32 */ + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function(func) + << errinfo_win32_error(error)); +#endif /* _WIN32 */ + } +} diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 24cf1b67e..3f4a74a7b 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -36,10 +36,11 @@ class I2_BASE_API TcpSocket : public Socket public: DECLARE_PTR_TYPEDEFS(TcpSocket); - TcpSocket(void); - void Bind(const String& service, int family); void Bind(const String& node, const String& service, int family); + +private: + void SocketType(); }; } diff --git a/lib/base/udpsocket.cpp b/lib/base/udpsocket.cpp index c61f4a5ce..2b219c992 100644 --- a/lib/base/udpsocket.cpp +++ b/lib/base/udpsocket.cpp @@ -27,10 +27,7 @@ using namespace icinga; -/** - * Constructor for the UdpSocket class. - */ -UdpSocket::UdpSocket(void) - : Socket(SOCK_DGRAM, IPPROTO_UDP) -{ } - +void UdpSocket::SocketType(){ + socktype = SOCK_DGRAM; + protocol = IPPROTO_UDP; +} diff --git a/lib/base/udpsocket.hpp b/lib/base/udpsocket.hpp index 815fbaba5..d3b8fc275 100644 --- a/lib/base/udpsocket.hpp +++ b/lib/base/udpsocket.hpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -34,11 +34,11 @@ namespace icinga class I2_BASE_API UdpSocket : public Socket { public: - DECLARE_PTR_TYPEDEFS(UdpSocket); + DECLARE_PTR_TYPEDEFS(UdpSocket); - UdpSocket(void); +private: + void SocketType(); }; - } #endif /* UDPSOCKET_H */ diff --git a/lib/perfdata/logstashwriter.cpp b/lib/perfdata/logstashwriter.cpp index dde0bd678..523c9ef46 100644 --- a/lib/perfdata/logstashwriter.cpp +++ b/lib/perfdata/logstashwriter.cpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -22,6 +22,7 @@ #include "icinga/service.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" +#include "icinga/perfdatavalue.hpp" #include "icinga/notification.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -30,6 +31,7 @@ #include "base/perfdatavalue.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" + #include "base/json.hpp" #include "base/context.hpp" #include @@ -40,135 +42,129 @@ using namespace icinga; REGISTER_TYPE(LogstashWriter); + void LogstashWriter::Start(bool runtimeCreated) { - ObjectImpl::Start(runtimeCreated); + ObjectImpl::Start(runtimeCreated); - m_ReconnectTimer = new Timer(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this)); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); - // Send check results - Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); - // Send notifications - Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); - // Send state change - Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); + // Send check results + Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2)); + // Send notifications + Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8)); + // Send state change + Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3)); } + void LogstashWriter::ReconnectTimerHandler(void) { if (m_Stream) return; - Socket::Ptr socket; - - if (GetSocketType() == "tcp") + if(GetDefaultProtocol() == true) socket = new TcpSocket(); else socket = new UdpSocket(); - Log(LogNotice, "LogstashWriter") - << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogNotice, "LogstashWriter") + << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogCritical, "LogstashWriter") - << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; - return; - } + try { + socket->Connect(GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogCritical, "LogstashWriter") + << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'."; + return; + } - m_Stream = new NetworkStream(socket); + m_Stream = new NetworkStream(socket); } void LogstashWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { - CONTEXT("LOGSTASH Processing check result for '" + checkable->GetName() + "'"); + CONTEXT("LOGSTASGH Processing check result for '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing check result for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter")<< "Logstash Processing check result for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); - } + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } - fields->Set("host_name", host->GetName()); - fields->Set("type", "CheckResult"); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("hostname", host->GetName()); + fields->Set("type", "CHECK RESULT"); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("latency", cr->CalculateLatency()); - fields->Set("execution_time", cr->CalculateExecutionTime()); - fields->Set("reachable", checkable->IsReachable()); + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + fields->Set("reachable", checkable->IsReachable()); - double ts = Utility::GetTime(); + if (cr) { + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); + fields->Set("check_source", cr->GetCheckSource()); + } - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - fields->Set("check_source", cr->GetCheckSource()); - ts = cr->GetExecutionEnd(); - } + if (GetEnableSendPerfdata()) { + Array::Ptr perfdata = cr->GetPerformanceData(); - Array::Ptr perfdata = cr->GetPerformanceData(); + if (perfdata) { + ObjectLock olock(perfdata); + BOOST_FOREACH(const Value& val, perfdata) { + PerfdataValue::Ptr pdv; - if (perfdata) { - Dictionary::Ptr perfdataItems = new Dictionary(); + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + String escaped_key = pdv->GetLabel(); + boost::replace_all(escaped_key, " ", "_"); + boost::replace_all(escaped_key, ".", "_"); + boost::replace_all(escaped_key, "\\", "_"); + boost::algorithm::replace_all(escaped_key, "::", "."); + fields->Set(escaped_key, pdv->GetValue()); - ObjectLock olock(perfdata); - for (const Value& val : perfdata) { - PerfdataValue::Ptr pdv; + if (pdv->GetMin()) + fields->Set(escaped_key + "_min", pdv->GetMin()); + if (pdv->GetMax()) + fields->Set(escaped_key + "_max", pdv->GetMax()); + if (pdv->GetWarn()) + fields->Set(escaped_key + "_warn", pdv->GetWarn()); + if (pdv->GetCrit()) + fields->Set(escaped_key + "_crit", pdv->GetCrit()); + } catch (const std::exception&) { + Log(LogWarning, "LogstashWriter") + << "Ignoring invalid perfdata value: '" << val << "' for object '" + << checkable-GetName() << "'."; + } + } + } + } + } - if (val.IsObjectType()) - pdv = val; - else { - try { - pdv = PerfdataValue::Parse(val); - } catch (const std::exception&) { - Log(LogWarning, "LogstashWriter") - << "Ignoring invalid perfdata value: '" << val << "' for object '" - << checkable->GetName() << "'."; - continue; - } - } - - Dictionary::Ptr perfdataItem = new Dictionary(); - perfdataItem->Set("value", pdv->GetValue()); - - if (pdv->GetMin()) - perfdataItem->Set("min", pdv->GetMin()); - if (pdv->GetMax()) - perfdataItem->Set("max", pdv->GetMax()); - if (pdv->GetWarn()) - perfdataItem->Set("warn", pdv->GetWarn()); - if (pdv->GetCrit()) - perfdataItem->Set("crit", pdv->GetCrit()); - - String escaped_key = EscapeMetricLabel(pdv->GetLabel()); - - perfdataItems->Set(escaped_key, perfdataItem); - } - - fields->Set("performance_data", perfdataItems); - } - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } @@ -176,87 +172,85 @@ void LogstashWriter::NotificationToUserHandler(const Notification::Ptr& notifica const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, const String& author, const String& comment_text, const String& command_name) { - CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing notification for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Logstash Processing notification for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - String notification_type_str = Notification::NotificationTypeToString(notification_type); + String notification_type_str = Notification::NotificationTypeToString(notification_type); - String author_comment = ""; + String author_comment = ""; - if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { - author_comment = author + ";" + comment_text; - } + if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) { + author_comment = author + ";" + comment_text; + } - double ts = Utility::GetTime(); + String output; + if (cr) + output = CompatUtility::GetCheckResultOutput(cr); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - if (service) { - fields->Set("type", "SERVICE NOTIFICATION"); - fields->Set("service_name", service->GetShortName()); - } else { - fields->Set("type", "HOST NOTIFICATION"); - } + if (service) { + fields->Set("type", "SERVICE NOTIFICATION"); + fields->Set("service", service->GetShortName()); + fields->Set("short_message", output); + } else { + fields->Set("type", "HOST NOTIFICATION"); + fields->Set("short_message", CompatUtility::GetHostStateString(host)+ ")"); + } - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - ts = cr->GetExecutionEnd(); - } + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("hostname", host->GetName()); + fields->Set("command", command_name); + fields->Set("notification_type", notification_type_str); + fields->Set("comment", author_comment); - fields->Set("host_name", host->GetName()); - fields->Set("command", command_name); - fields->Set("notification_type", notification_type_str); - fields->Set("comment", author_comment); - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } void LogstashWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { - CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); + CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'"); - Log(LogDebug, "LogstashWriter") - << "Processing state change for '" << checkable->GetName() << "'"; + Log(LogDebug, "LogstashWriter") + << "Logstash Processing state change for '" << checkable->GetName() << "'"; - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + double ts = cr->GetExecutionEnd(); - Dictionary::Ptr fields = new Dictionary(); + Dictionary::Ptr fields = new Dictionary(); - fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); - fields->Set("type", "StateChange"); - fields->Set("current_check_attempt", checkable->GetCheckAttempt()); - fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); - fields->Set("hostname", host->GetName()); + fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("type", "STATE CHANGE"); + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("hostname", host->GetName()); - if (service) { - fields->Set("service_name", service->GetShortName()); - fields->Set("service_state", Service::StateToString(service->GetState())); - fields->Set("last_state", service->GetLastState()); - fields->Set("last_hard_state", service->GetLastHardState()); - } else { - fields->Set("last_state", host->GetLastState()); - fields->Set("last_hard_state", host->GetLastHardState()); + if (service) { + fields->Set("service_name", service->GetShortName()); + fields->Set("service_state", Service::StateToString(service->GetState())); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); } - double ts = Utility::GetTime(); - - if (cr) { - fields->Set("plugin_output", cr->GetOutput()); - fields->Set("check_source", cr->GetCheckSource()); - ts = cr->GetExecutionEnd(); - } - - SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); + if (cr) { + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr)); + fields->Set("check_source", cr->GetCheckSource()); + } + SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts)); } String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts) @@ -264,8 +258,8 @@ String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, con fields->Set("version", "1.1"); fields->Set("host", source); fields->Set("timestamp", ts); - - return JsonEncode(fields) + "\n"; + String logstashObj= JsonEncode(fields); + return logstashObj+ "\n"; } void LogstashWriter::SendLogMessage(const String& message) @@ -276,32 +270,12 @@ void LogstashWriter::SendLogMessage(const String& message) return; try { - m_Stream->Write(&message[0], message.GetLength()); + m_Stream->Write(&message[0], message.GetLength()); } catch (const std::exception& ex) { - Log(LogCritical, "LogstashWriter") - << "Cannot write to " << GetSocketType() - << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + Log(LogCritical, "LogstashWriter") << "Cannot write to " << + ((GetDefaultProtocol()==true) ? "tcp" : "udp") << " socket on host '" << GetHost() << "' port '" << GetPort() << "'."; m_Stream.reset(); } } -String LogstashWriter::EscapeMetricLabel(const String& str) -{ - String result = str; - - boost::replace_all(result, " ", "_"); - boost::replace_all(result, ".", "_"); - boost::replace_all(result, "\\", "_"); - boost::replace_all(result, "::", "."); - - return result; -} - -void LogstashWriter::ValidateSocketType(const String& value, const ValidationUtils& utils) -{ - ObjectImpl::ValidateSocketType(value, utils); - - if (value != "udp" && value != "tcp") - BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("socket_type"), "Socket type '" + value + "' is invalid.")); -} diff --git a/lib/perfdata/logstashwriter.hpp b/lib/perfdata/logstashwriter.hpp index 2487b29e9..65c46f51b 100644 --- a/lib/perfdata/logstashwriter.hpp +++ b/lib/perfdata/logstashwriter.hpp @@ -1,6 +1,6 @@ /****************************************************************************** * Icinga 2 * - * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * @@ -39,32 +39,27 @@ namespace icinga */ class LogstashWriter : public ObjectImpl { - public: - DECLARE_OBJECT(LogstashWriter); - DECLARE_OBJECTNAME(LogstashWriter); - - virtual void ValidateSocketType(const String& value, const ValidationUtils& utils) override; + DECLARE_OBJECT(LogstashWriter); + DECLARE_OBJECTNAME(LogstashWriter); protected: - virtual void Start(bool runtimeCreated) override; + virtual void Start(bool runtimeCreated) override; private: - Stream::Ptr m_Stream; + Stream::Ptr m_Stream; - Timer::Ptr m_ReconnectTimer; + Timer::Ptr m_ReconnectTimer; - void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, - const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, - const String& author, const String& comment_text, const String& command_name); - void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); - void SendLogMessage(const String& message); + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr, + const String& author, const String& comment_text, const String& command_name); + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void SendLogMessage(const String& message); String ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts); - static String EscapeMetricLabel(const String& str); - - void ReconnectTimerHandler(void); + void ReconnectTimerHandler(void); }; } diff --git a/lib/perfdata/logstashwriter.ti b/lib/perfdata/logstashwriter.ti index b64bcf8db..6d822c995 100644 --- a/lib/perfdata/logstashwriter.ti +++ b/lib/perfdata/logstashwriter.ti @@ -33,14 +33,18 @@ class LogstashWriter : ConfigObject [config] String port { default {{{ return "9201"; }}} }; - - [config] String socket_type { - default {{{ return "udp"; }}} + + [config] bool defaultProtocol { + default {{{ return "true"; }}} }; [config] String source { default {{{ return "icinga2"; }}} }; + + [config] bool enable_send_perfdata { + default {{{ return false; }}} + }; }; }