From d26aa9fb34f1d6ebaa4e420e9b573433c6b2f9bf Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 14 Apr 2021 18:52:36 +0200 Subject: [PATCH 1/2] InfluxdbCommonWriter: abstract InfluxdbWriter refs #8711 --- doc/09-object-types.md | 2 +- doc/13-addons.md | 2 +- doc/14-features.md | 2 +- etc/icinga2/features-available/influxdb.conf | 2 +- lib/perfdata/CMakeLists.txt | 2 + lib/perfdata/influxdbcommonwriter.cpp | 586 ++++++++++++++++++ lib/perfdata/influxdbcommonwriter.hpp | 99 +++ lib/perfdata/influxdbcommonwriter.ti | 85 +++ lib/perfdata/influxdbwriter.cpp | 601 +------------------ lib/perfdata/influxdbwriter.hpp | 39 +- lib/perfdata/influxdbwriter.ti | 72 +-- 11 files changed, 800 insertions(+), 692 deletions(-) create mode 100644 lib/perfdata/influxdbcommonwriter.cpp create mode 100644 lib/perfdata/influxdbcommonwriter.hpp create mode 100644 lib/perfdata/influxdbcommonwriter.ti diff --git a/doc/09-object-types.md b/doc/09-object-types.md index a1ac37cea..b87eff813 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1604,7 +1604,7 @@ Runtime Attributes: ### InfluxdbWriter -Writes check result metrics and performance data to a defined InfluxDB host. +Writes check result metrics and performance data to a defined InfluxDB v1 host. This configuration object is available as [influxdb feature](14-features.md#influxdb-writer). Example: diff --git a/doc/13-addons.md b/doc/13-addons.md index 4923f36f0..fd65b4b02 100644 --- a/doc/13-addons.md +++ b/doc/13-addons.md @@ -57,7 +57,7 @@ Integration in Icinga Web 2 is possible by installing the official [graphite mod It’s written in Go and has no external dependencies. Use the [InfluxdbWriter](14-features.md#influxdb-writer) feature -for sending real-time metrics from Icinga 2 to InfluxDB. +for sending real-time metrics from Icinga 2 to InfluxDB v1. ```bash icinga2 feature enable influxdb diff --git a/doc/14-features.md b/doc/14-features.md index 4a1809828..10656331a 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -369,7 +369,7 @@ where Carbon Cache/Relay is running as receiver. ### InfluxDB Writer Once there are new metrics available, Icinga 2 will directly write them to the -defined InfluxDB HTTP API. +defined InfluxDB v1 HTTP API. You can enable the feature using diff --git a/etc/icinga2/features-available/influxdb.conf b/etc/icinga2/features-available/influxdb.conf index af8423577..f0af37bdb 100644 --- a/etc/icinga2/features-available/influxdb.conf +++ b/etc/icinga2/features-available/influxdb.conf @@ -1,6 +1,6 @@ /** * The InfluxdbWriter type writes check result metrics and - * performance data to an InfluxDB HTTP API + * performance data to an InfluxDB v1 HTTP API */ object InfluxdbWriter "influxdb" { diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index c7fba5851..0e54ec832 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -2,6 +2,7 @@ mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp) mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp) +mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp) mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp) mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp) mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp) @@ -11,6 +12,7 @@ set(perfdata_SOURCES elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp + influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp new file mode 100644 index 000000000..d85426f44 --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -0,0 +1,586 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbcommonwriter.hpp" +#include "perfdata/influxdbcommonwriter-ti.cpp" +#include "remote/url.hpp" +#include "icinga/service.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/icingaapplication.hpp" +#include "icinga/checkcommand.hpp" +#include "base/application.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/stream.hpp" +#include "base/json.hpp" +#include "base/networkstream.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include "base/tlsutility.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace icinga; + +REGISTER_TYPE(InfluxdbCommonWriter); + +class InfluxdbInteger final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(InfluxdbInteger); + + InfluxdbInteger(int value) + : m_Value(value) + { } + + int GetValue() const + { + return m_Value; + } + +private: + int m_Value; +}; + +void InfluxdbCommonWriter::OnConfigLoaded() +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName(GetReflectionType()->GetName() + ", " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, GetReflectionType()->GetName()) + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void InfluxdbCommonWriter::Resume() +{ + ObjectImpl::Resume(); + + Log(LogInformation, GetReflectionType()->GetName()) + << "'" << GetName() << "' resumed."; + + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = new Timer(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + + /* Register for new metrics. */ + Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); +} + +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +void InfluxdbCommonWriter::Pause() +{ + /* Force a flush. */ + Log(LogDebug, GetReflectionType()->GetName()) + << "Flushing pending data buffers."; + + Flush(); + + /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ + Log(LogDebug, GetReflectionType()->GetName()) + << "Joining existing WQ tasks."; + + m_WorkQueue.Join(); + + /* Flush again after the WQ tasks have filled the data buffer. */ + Log(LogDebug, GetReflectionType()->GetName()) + << "Flushing data buffers from WQ tasks."; + + Flush(); + + Log(LogInformation, GetReflectionType()->GetName()) + << "'" << GetName() << "' paused."; + + ObjectImpl::Pause(); +} + +void InfluxdbCommonWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, GetReflectionType()->GetName(), "Exception during InfluxDB operation: Verify that your backend is operational!"); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); + + //TODO: Close the connection, if we keep it open. +} + +OptionalTlsStream InfluxdbCommonWriter::Connect() +{ + Log(LogNotice, GetReflectionType()->GetName()) + << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool ssl = GetSslEnable(); + + if (ssl) { + Shared::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (ssl) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "TLS handshake with host '" << GetHost() << "' failed."; + throw; + } + } + + return std::move(stream); +} + +void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow); +} + +void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("Processing check result for '" + checkable->GetName() + "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + MacroProcessor::ResolverList resolvers; + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + resolvers.emplace_back("icinga", IcingaApplication::GetInstance()); + + String prefix; + + double ts = cr->GetExecutionEnd(); + + // Clone the template and perform an in-place macro expansion of measurement and tag values + Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate(); + Dictionary::Ptr tmpl = static_pointer_cast(tmpl_clean->ShallowClone()); + tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr)); + + Dictionary::Ptr tagsClean = tmpl->Get("tags"); + if (tagsClean) { + Dictionary::Ptr tags = new Dictionary(); + + { + ObjectLock olock(tagsClean); + for (const Dictionary::Pair& pair : tagsClean) { + String missing_macro; + Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); + + if (missing_macro.IsEmpty()) { + tags->Set(pair.first, value); + } + } + } + + tmpl->Set("tags", tags); + } + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (perfdata) { + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + Dictionary::Ptr fields = new Dictionary(); + fields->Set("value", pdv->GetValue()); + + if (GetEnableSendThresholds()) { + if (!pdv->GetCrit().IsEmpty()) + fields->Set("crit", pdv->GetCrit()); + if (!pdv->GetWarn().IsEmpty()) + fields->Set("warn", pdv->GetWarn()); + if (!pdv->GetMin().IsEmpty()) + fields->Set("min", pdv->GetMin()); + if (!pdv->GetMax().IsEmpty()) + fields->Set("max", pdv->GetMax()); + } + if (!pdv->GetUnit().IsEmpty()) { + fields->Set("unit", pdv->GetUnit()); + } + + SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts); + } + } + + if (GetEnableSendMetadata()) { + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) + fields->Set("state", new InfluxdbInteger(service->GetState())); + else + fields->Set("state", new InfluxdbInteger(host->GetState())); + + fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt())); + fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts())); + fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType())); + fields->Set("reachable", checkable->IsReachable()); + fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth())); + fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement())); + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + + SendMetric(checkable, tmpl, Empty, fields, ts); + } +} + +String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str) +{ + // Iterate over the key name and escape commas and spaces with a backslash + String result = str; + boost::algorithm::replace_all(result, "\"", "\\\""); + boost::algorithm::replace_all(result, "=", "\\="); + boost::algorithm::replace_all(result, ",", "\\,"); + boost::algorithm::replace_all(result, " ", "\\ "); + + // InfluxDB 'feature': although backslashes are allowed in keys they also act + // as escape sequences when followed by ',' or ' '. When your tag is like + // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped + // and through experimentation they also escape '='. To be safe we replace + // trailing backslashes with and underscore. + // See https://github.com/influxdata/influxdb/issues/8587 for more info + size_t length = result.GetLength(); + if (result[length - 1] == '\\') + result[length - 1] = '_'; + + return result; +} + +String InfluxdbCommonWriter::EscapeValue(const Value& value) +{ + if (value.IsObjectType()) { + std::ostringstream os; + os << static_cast(value)->GetValue() << "i"; + return os.str(); + } + + if (value.IsBoolean()) + return value ? "true" : "false"; + + if (value.IsString()) + return "\"" + EscapeKeyOrTagValue(value) + "\""; + + return value; +} + +void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, + const String& label, const Dictionary::Ptr& fields, double ts) +{ + std::ostringstream msgbuf; + msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement")); + + Dictionary::Ptr tags = tmpl->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + // Empty macro expansion, no tag + if (!pair.second.IsEmpty()) { + msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second); + } + } + } + + // Label may be empty in the case of metadata + if (!label.IsEmpty()) + msgbuf << ",metric=" << EscapeKeyOrTagValue(label); + + msgbuf << " "; + + { + bool first = true; + + ObjectLock fieldLock(fields); + for (const Dictionary::Pair& pair : fields) { + if (first) + first = false; + else + msgbuf << ","; + + msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second); + } + } + + msgbuf << " " << static_cast(ts); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'."; + + // Buffer the data point + m_DataBuffer.emplace_back(msgbuf.str()); + + // Flush if we've buffered too much to prevent excessive memory use + if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { + Log(LogDebug, GetReflectionType()->GetName()) + << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; + + try { + Flush(); + } catch (...) { + /* Do nothing. */ + } + } +} + +void InfluxdbCommonWriter::FlushTimeout() +{ + m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); +} + +void InfluxdbCommonWriter::FlushTimeoutWQ() +{ + AssertOnWorkQueue(); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + + Flush(); +} + +void InfluxdbCommonWriter::Flush() +{ + namespace beast = boost::beast; + namespace http = beast::http; + + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ + if (m_DataBuffer.empty()) + return; + + Log(LogDebug, GetReflectionType()->GetName()) + << "Flushing data buffer to InfluxDB."; + + String body = boost::algorithm::join(m_DataBuffer, "\n"); + m_DataBuffer.clear(); + + OptionalTlsStream stream; + + try { + stream = Connect(); + } catch (const std::exception& ex) { + Log(LogWarning, "InfluxDbWriter") + << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); + return; + } + + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); + + auto request (AssembleRequest(std::move(body))); + + try { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + http::parser parser; + beast::flat_buffer buf; + + try { + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); + throw; + } + + auto& response (parser.get()); + + if (response.result() != http::status::no_content) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unexpected response code: " << response.result(); + + auto& contentType (response[http::field::content_type]); + if (contentType != "application/json") { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unexpected Content-Type: " << contentType; + return; + } + + Dictionary::Ptr jsonResponse; + auto& body (response.body()); + + try { + jsonResponse = JsonDecode(body); + } catch (...) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unable to parse JSON response:\n" << body; + return; + } + + String error = jsonResponse->Get("error"); + + Log(LogCritical, GetReflectionType()->GetName()) + << "InfluxDB error message:\n" << error; + } +} + +boost::beast::http::request InfluxdbCommonWriter::AssembleBaseRequest(String body) +{ + namespace http = boost::beast::http; + + auto url (AssembleUrl()); + http::request request (http::verb::post, std::string(url->Format(true)), 10); + + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + request.body() = std::move(body); + request.content_length(request.body().size()); + + return std::move(request); +} + +Url::Ptr InfluxdbCommonWriter::AssembleBaseUrl() +{ + Url::Ptr url = new Url(); + + url->SetScheme(GetSslEnable() ? "https" : "http"); + url->SetHost(GetHost()); + url->SetPort(GetPort()); + url->AddQueryElement("precision", "s"); + + return std::move(url); +} + +void InfluxdbCommonWriter::ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateHostTemplate(lvalue, utils); + + String measurement = lvalue()->Get("measurement"); + if (!MacroProcessor::ValidateMacroString(measurement)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} + +void InfluxdbCommonWriter::ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateServiceTemplate(lvalue, utils); + + String measurement = lvalue()->Get("measurement"); + if (!MacroProcessor::ValidateMacroString(measurement)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} + diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp new file mode 100644 index 000000000..06e860841 --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -0,0 +1,99 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#ifndef INFLUXDBCOMMONWRITER_H +#define INFLUXDBCOMMONWRITER_H + +#include "perfdata/influxdbcommonwriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/perfdatavalue.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" +#include "base/workqueue.hpp" +#include "remote/url.hpp" +#include +#include +#include + +namespace icinga +{ + +/** + * Common base class for InfluxDB v1/v2 writers. + * + * @ingroup perfdata + */ +class InfluxdbCommonWriter : public ObjectImpl +{ +public: + DECLARE_OBJECT(InfluxdbCommonWriter); + + template + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + void ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; + void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; + +protected: + WorkQueue m_WorkQueue{10000000, 1}; + std::vector m_DataBuffer; + + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + + boost::beast::http::request AssembleBaseRequest(String body); + Url::Ptr AssembleBaseUrl(); + virtual boost::beast::http::request AssembleRequest(String body) = 0; + virtual Url::Ptr AssembleUrl() = 0; + +private: + Timer::Ptr m_FlushTimer; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, + const String& label, const Dictionary::Ptr& fields, double ts); + void FlushTimeout(); + void FlushTimeoutWQ(); + void Flush(); + + static String EscapeKeyOrTagValue(const String& str); + static String EscapeValue(const Value& value); + + OptionalTlsStream Connect(); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); +}; + +template +void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + auto typeName (InfluxWriter::TypeInstance->GetName().ToLower()); + + for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType()) { + size_t workQueueItems = influxwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + size_t dataBufferItems = influxwriter->m_DataBuffer.size(); + + nodes.emplace_back(influxwriter->GetName(), new Dictionary({ + { "work_queue_items", workQueueItems }, + { "work_queue_item_rate", workQueueItemRate }, + { "data_buffer_items", dataBufferItems } + })); + + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems)); + } + + status->Set(typeName, new Dictionary(std::move(nodes))); +} + +} + +#endif /* INFLUXDBCOMMONWRITER_H */ diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti new file mode 100644 index 000000000..7eb26dac9 --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -0,0 +1,85 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +abstract class InfluxdbCommonWriter : ConfigObject +{ + [config, required] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config, required] String port { + default {{{ return "8086"; }}} + }; + [config] bool ssl_enable { + default {{{ return false; }}} + }; + [config] String ssl_ca_cert { + default {{{ return ""; }}} + }; + [config] String ssl_cert { + default {{{ return ""; }}} + }; + [config] String ssl_key{ + default {{{ return ""; }}} + }; + [config, required] Dictionary::Ptr host_template { + default {{{ + return new Dictionary({ + { "measurement", "$host.check_command$" }, + { "tags", new Dictionary({ + { "hostname", "$host.name$" } + }) } + }); + }}} + }; + [config, required] Dictionary::Ptr service_template { + default {{{ + return new Dictionary({ + { "measurement", "$service.check_command$" }, + { "tags", new Dictionary({ + { "hostname", "$host.name$" }, + { "service", "$service.name$" } + }) } + }); + }}} + }; + [config] bool enable_send_thresholds { + default {{{ return false; }}} + }; + [config] bool enable_send_metadata { + default {{{ return false; }}} + }; + [config] int flush_interval { + default {{{ return 10; }}} + }; + [config] int flush_threshold { + default {{{ return 1024; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; +}; + +validator InfluxdbCommonWriter { + Dictionary host_template { + required measurement; + String measurement; + Dictionary "tags" { + String "*"; + }; + }; + Dictionary service_template { + required measurement; + String measurement; + Dictionary "tags" { + String "*"; + }; + }; +}; + +} diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index cde7a3653..30240f7f4 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -2,618 +2,55 @@ #include "perfdata/influxdbwriter.hpp" #include "perfdata/influxdbwriter-ti.cpp" -#include "remote/url.hpp" -#include "icinga/service.hpp" -#include "icinga/macroprocessor.hpp" -#include "icinga/icingaapplication.hpp" -#include "icinga/checkcommand.hpp" -#include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" -#include "base/configtype.hpp" -#include "base/objectlock.hpp" -#include "base/logger.hpp" -#include "base/convert.hpp" -#include "base/utility.hpp" -#include "base/perfdatavalue.hpp" -#include "base/stream.hpp" -#include "base/json.hpp" #include "base/base64.hpp" -#include "base/networkstream.hpp" -#include "base/exception.hpp" +#include "remote/url.hpp" +#include "base/configtype.hpp" +#include "base/perfdatavalue.hpp" #include "base/statsfunction.hpp" -#include "base/tlsutility.hpp" -#include -#include -#include -#include -#include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include #include using namespace icinga; -class InfluxdbInteger final : public Object -{ -public: - DECLARE_PTR_TYPEDEFS(InfluxdbInteger); - - InfluxdbInteger(int value) - : m_Value(value) - { } - - int GetValue() const - { - return m_Value; - } - -private: - int m_Value; -}; - REGISTER_TYPE(InfluxdbWriter); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); -void InfluxdbWriter::OnConfigLoaded() -{ - ObjectImpl::OnConfigLoaded(); - - m_WorkQueue.SetName("InfluxdbWriter, " + GetName()); - - if (!GetEnableHa()) { - Log(LogDebug, "InfluxdbWriter") - << "HA functionality disabled. Won't pause connection: " << GetName(); - - SetHAMode(HARunEverywhere); - } else { - SetHAMode(HARunOnce); - } -} - void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { - DictionaryData nodes; + InfluxdbCommonWriter::StatsFunc(status, perfdata); +} - for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType()) { - size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength(); - double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - size_t dataBufferItems = influxdbwriter->m_DataBuffer.size(); +boost::beast::http::request InfluxdbWriter::AssembleRequest(String body) +{ + auto request (AssembleBaseRequest(std::move(body))); + Dictionary::Ptr basicAuth = GetBasicAuth(); - nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({ - { "work_queue_items", workQueueItems }, - { "work_queue_item_rate", workQueueItemRate }, - { "data_buffer_items", dataBufferItems } - })); - - perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems)); - perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); - perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems)); + if (basicAuth) { + request.set( + boost::beast::http::field::authorization, + "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password")) + ); } - status->Set("influxdbwriter", new Dictionary(std::move(nodes))); + return std::move(request); } -void InfluxdbWriter::Resume() +Url::Ptr InfluxdbWriter::AssembleUrl() { - ObjectImpl::Resume(); - - Log(LogInformation, "InfluxdbWriter") - << "'" << GetName() << "' resumed."; - - /* Register exception handler for WQ tasks. */ - m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - - /* Setup timer for periodically flushing m_DataBuffer */ - m_FlushTimer = new Timer(); - m_FlushTimer->SetInterval(GetFlushInterval()); - m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); - m_FlushTimer->Start(); - m_FlushTimer->Reschedule(0); - - /* Register for new metrics. */ - Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { - CheckResultHandler(checkable, cr); - }); -} - -/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ -void InfluxdbWriter::Pause() -{ - /* Force a flush. */ - Log(LogDebug, "InfluxdbWriter") - << "Flushing pending data buffers."; - - Flush(); - - /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ - Log(LogDebug, "InfluxdbWriter") - << "Joining existing WQ tasks."; - - m_WorkQueue.Join(); - - /* Flush again after the WQ tasks have filled the data buffer. */ - Log(LogDebug, "InfluxdbWriter") - << "Flushing data buffers from WQ tasks."; - - Flush(); - - Log(LogInformation, "InfluxdbWriter") - << "'" << GetName() << "' paused."; - - ObjectImpl::Pause(); -} - -void InfluxdbWriter::AssertOnWorkQueue() -{ - ASSERT(m_WorkQueue.IsWorkerThread()); -} - -void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) -{ - Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!"); - - Log(LogDebug, "InfluxdbWriter") - << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); - - //TODO: Close the connection, if we keep it open. -} - -OptionalTlsStream InfluxdbWriter::Connect() -{ - Log(LogNotice, "InfluxdbWriter") - << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool ssl = GetSslEnable(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (ssl) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "TLS handshake with host '" << GetHost() << "' failed."; - throw; - } - } - - return std::move(stream); -} - -void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) -{ - if (IsPaused()) - return; - - m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow); -} - -void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) -{ - AssertOnWorkQueue(); - - CONTEXT("Processing check result for '" + checkable->GetName() + "'"); - - if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) - return; - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - MacroProcessor::ResolverList resolvers; - if (service) - resolvers.emplace_back("service", service); - resolvers.emplace_back("host", host); - resolvers.emplace_back("icinga", IcingaApplication::GetInstance()); - - String prefix; - - double ts = cr->GetExecutionEnd(); - - // Clone the template and perform an in-place macro expansion of measurement and tag values - Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate(); - Dictionary::Ptr tmpl = static_pointer_cast(tmpl_clean->ShallowClone()); - tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr)); - - Dictionary::Ptr tagsClean = tmpl->Get("tags"); - if (tagsClean) { - Dictionary::Ptr tags = new Dictionary(); - - { - ObjectLock olock(tagsClean); - for (const Dictionary::Pair& pair : tagsClean) { - String missing_macro; - Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); - - if (missing_macro.IsEmpty()) { - tags->Set(pair.first, value); - } - } - } - - tmpl->Set("tags", tags); - } - - CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); - - Array::Ptr perfdata = cr->GetPerformanceData(); - - if (perfdata) { - ObjectLock olock(perfdata); - for (const Value& val : perfdata) { - PerfdataValue::Ptr pdv; - - if (val.IsObjectType()) - pdv = val; - else { - try { - pdv = PerfdataValue::Parse(val); - } catch (const std::exception&) { - Log(LogWarning, "InfluxdbWriter") - << "Ignoring invalid perfdata for checkable '" - << checkable->GetName() << "' and command '" - << checkCommand->GetName() << "' with value: " << val; - continue; - } - } - - Dictionary::Ptr fields = new Dictionary(); - fields->Set("value", pdv->GetValue()); - - if (GetEnableSendThresholds()) { - if (!pdv->GetCrit().IsEmpty()) - fields->Set("crit", pdv->GetCrit()); - if (!pdv->GetWarn().IsEmpty()) - fields->Set("warn", pdv->GetWarn()); - if (!pdv->GetMin().IsEmpty()) - fields->Set("min", pdv->GetMin()); - if (!pdv->GetMax().IsEmpty()) - fields->Set("max", pdv->GetMax()); - } - if (!pdv->GetUnit().IsEmpty()) { - fields->Set("unit", pdv->GetUnit()); - } - - SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts); - } - } - - if (GetEnableSendMetadata()) { - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - Dictionary::Ptr fields = new Dictionary(); - - if (service) - fields->Set("state", new InfluxdbInteger(service->GetState())); - else - fields->Set("state", new InfluxdbInteger(host->GetState())); - - fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt())); - fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts())); - fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType())); - fields->Set("reachable", checkable->IsReachable()); - fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth())); - fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement())); - fields->Set("latency", cr->CalculateLatency()); - fields->Set("execution_time", cr->CalculateExecutionTime()); - - SendMetric(checkable, tmpl, Empty, fields, ts); - } -} - -String InfluxdbWriter::EscapeKeyOrTagValue(const String& str) -{ - // Iterate over the key name and escape commas and spaces with a backslash - String result = str; - boost::algorithm::replace_all(result, "\"", "\\\""); - boost::algorithm::replace_all(result, "=", "\\="); - boost::algorithm::replace_all(result, ",", "\\,"); - boost::algorithm::replace_all(result, " ", "\\ "); - - // InfluxDB 'feature': although backslashes are allowed in keys they also act - // as escape sequences when followed by ',' or ' '. When your tag is like - // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped - // and through experimentation they also escape '='. To be safe we replace - // trailing backslashes with and underscore. - // See https://github.com/influxdata/influxdb/issues/8587 for more info - size_t length = result.GetLength(); - if (result[length - 1] == '\\') - result[length - 1] = '_'; - - return result; -} - -String InfluxdbWriter::EscapeValue(const Value& value) -{ - if (value.IsObjectType()) { - std::ostringstream os; - os << static_cast(value)->GetValue() << "i"; - return os.str(); - } - - if (value.IsBoolean()) - return value ? "true" : "false"; - - if (value.IsString()) - return "\"" + EscapeKeyOrTagValue(value) + "\""; - - return value; -} - -void InfluxdbWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, - const String& label, const Dictionary::Ptr& fields, double ts) -{ - std::ostringstream msgbuf; - msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement")); - - Dictionary::Ptr tags = tmpl->Get("tags"); - if (tags) { - ObjectLock olock(tags); - for (const Dictionary::Pair& pair : tags) { - // Empty macro expansion, no tag - if (!pair.second.IsEmpty()) { - msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second); - } - } - } - - // Label may be empty in the case of metadata - if (!label.IsEmpty()) - msgbuf << ",metric=" << EscapeKeyOrTagValue(label); - - msgbuf << " "; - - { - bool first = true; - - ObjectLock fieldLock(fields); - for (const Dictionary::Pair& pair : fields) { - if (first) - first = false; - else - msgbuf << ","; - - msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second); - } - } - - msgbuf << " " << static_cast(ts); - - Log(LogDebug, "InfluxdbWriter") - << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'."; - - // Buffer the data point - m_DataBuffer.emplace_back(msgbuf.str()); - - // Flush if we've buffered too much to prevent excessive memory use - if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { - Log(LogDebug, "InfluxdbWriter") - << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; - - try { - Flush(); - } catch (...) { - /* Do nothing. */ - } - } -} - -void InfluxdbWriter::FlushTimeout() -{ - m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); -} - -void InfluxdbWriter::FlushTimeoutWQ() -{ - AssertOnWorkQueue(); - - Log(LogDebug, "InfluxdbWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - - Flush(); -} - -void InfluxdbWriter::Flush() -{ - namespace beast = boost::beast; - namespace http = beast::http; - - /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ - if (m_DataBuffer.empty()) - return; - - Log(LogDebug, "InfluxdbWriter") - << "Flushing data buffer to InfluxDB."; - - String body = boost::algorithm::join(m_DataBuffer, "\n"); - m_DataBuffer.clear(); - - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxDbWriter") - << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - - Url::Ptr url = new Url(); - url->SetScheme(GetSslEnable() ? "https" : "http"); - url->SetHost(GetHost()); - url->SetPort(GetPort()); + auto url (AssembleBaseUrl()); std::vector path; path.emplace_back("write"); url->SetPath(path); url->AddQueryElement("db", GetDatabase()); - url->AddQueryElement("precision", "s"); + if (!GetUsername().IsEmpty()) url->AddQueryElement("u", GetUsername()); if (!GetPassword().IsEmpty()) url->AddQueryElement("p", GetPassword()); - http::request request (http::verb::post, std::string(url->Format(true)), 10); - - request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); - request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); - - { - Dictionary::Ptr basicAuth = GetBasicAuth(); - - if (basicAuth) { - request.set( - http::field::authorization, - "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password")) - ); - } - } - - request.body() = body; - request.content_length(request.body().size()); - - try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw; - } - - auto& response (parser.get()); - - if (response.result() != http::status::no_content) { - Log(LogWarning, "InfluxdbWriter") - << "Unexpected response code: " << response.result(); - - auto& contentType (response[http::field::content_type]); - if (contentType != "application/json") { - Log(LogWarning, "InfluxdbWriter") - << "Unexpected Content-Type: " << contentType; - return; - } - - Dictionary::Ptr jsonResponse; - auto& body (response.body()); - - try { - jsonResponse = JsonDecode(body); - } catch (...) { - Log(LogWarning, "InfluxdbWriter") - << "Unable to parse JSON response:\n" << body; - return; - } - - String error = jsonResponse->Get("error"); - - Log(LogCritical, "InfluxdbWriter") - << "InfluxDB error message:\n" << error; - } + return std::move(url); } - -void InfluxdbWriter::ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) -{ - ObjectImpl::ValidateHostTemplate(lvalue, utils); - - String measurement = lvalue()->Get("measurement"); - if (!MacroProcessor::ValidateMacroString(measurement)) - BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); - - Dictionary::Ptr tags = lvalue()->Get("tags"); - if (tags) { - ObjectLock olock(tags); - for (const Dictionary::Pair& pair : tags) { - if (!MacroProcessor::ValidateMacroString(pair.second)) - BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); - } - } -} - -void InfluxdbWriter::ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) -{ - ObjectImpl::ValidateServiceTemplate(lvalue, utils); - - String measurement = lvalue()->Get("measurement"); - if (!MacroProcessor::ValidateMacroString(measurement)) - BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); - - Dictionary::Ptr tags = lvalue()->Get("tags"); - if (tags) { - ObjectLock olock(tags); - for (const Dictionary::Pair& pair : tags) { - if (!MacroProcessor::ValidateMacroString(pair.second)) - BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); - } - } -} - diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index 1f7ab8309..48676cc97 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -4,19 +4,12 @@ #define INFLUXDBWRITER_H #include "perfdata/influxdbwriter-ti.hpp" -#include "icinga/service.hpp" -#include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" -#include "base/workqueue.hpp" -#include namespace icinga { /** - * An Icinga InfluxDB writer. + * An Icinga InfluxDB v1 writer. * * @ingroup perfdata */ @@ -28,35 +21,9 @@ public: static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); - void ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; - void ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) override; - protected: - void OnConfigLoaded() override; - void Resume() override; - void Pause() override; - -private: - WorkQueue m_WorkQueue{10000000, 1}; - Timer::Ptr m_FlushTimer; - std::vector m_DataBuffer; - - void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, - const String& label, const Dictionary::Ptr& fields, double ts); - void FlushTimeout(); - void FlushTimeoutWQ(); - void Flush(); - - static String EscapeKeyOrTagValue(const String& str); - static String EscapeValue(const Value& value); - - OptionalTlsStream Connect(); - - void AssertOnWorkQueue(); - - void ExceptionHandler(boost::exception_ptr exp); + boost::beast::http::request AssembleRequest(String body) override; + Url::Ptr AssembleUrl() override; }; } diff --git a/lib/perfdata/influxdbwriter.ti b/lib/perfdata/influxdbwriter.ti index 73a340cc3..e6fc84e5c 100644 --- a/lib/perfdata/influxdbwriter.ti +++ b/lib/perfdata/influxdbwriter.ti @@ -1,22 +1,16 @@ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ -#include "base/configobject.hpp" +#include "perfdata/influxdbcommonwriter.hpp" library perfdata; namespace icinga { -class InfluxdbWriter : ConfigObject +class InfluxdbWriter : InfluxdbCommonWriter { activation_priority 100; - [config, required] String host { - default {{{ return "127.0.0.1"; }}} - }; - [config, required] String port { - default {{{ return "8086"; }}} - }; [config, required] String database { default {{{ return "icinga2"; }}} }; @@ -26,72 +20,10 @@ class InfluxdbWriter : ConfigObject [config, no_user_view] String password { default {{{ return ""; }}} }; - [config] bool ssl_enable { - default {{{ return false; }}} - }; - [config] String ssl_ca_cert { - default {{{ return ""; }}} - }; - [config] String ssl_cert { - default {{{ return ""; }}} - }; - [config] String ssl_key{ - default {{{ return ""; }}} - }; [config, no_user_view] Dictionary::Ptr basic_auth; - [config, required] Dictionary::Ptr host_template { - default {{{ - return new Dictionary({ - { "measurement", "$host.check_command$" }, - { "tags", new Dictionary({ - { "hostname", "$host.name$" } - }) } - }); - }}} - }; - [config, required] Dictionary::Ptr service_template { - default {{{ - return new Dictionary({ - { "measurement", "$service.check_command$" }, - { "tags", new Dictionary({ - { "hostname", "$host.name$" }, - { "service", "$service.name$" } - }) } - }); - }}} - }; - [config] bool enable_send_thresholds { - default {{{ return false; }}} - }; - [config] bool enable_send_metadata { - default {{{ return false; }}} - }; - [config] int flush_interval { - default {{{ return 10; }}} - }; - [config] int flush_threshold { - default {{{ return 1024; }}} - }; - [config] bool enable_ha { - default {{{ return false; }}} - }; }; validator InfluxdbWriter { - Dictionary host_template { - required measurement; - String measurement; - Dictionary "tags" { - String "*"; - }; - }; - Dictionary service_template { - required measurement; - String measurement; - Dictionary "tags" { - String "*"; - }; - }; Dictionary basic_auth { required username; String username; From 9d4b0f12687acc4fa83ef3f9e7b36db1dc32f2c0 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 15 Apr 2021 14:08:50 +0200 Subject: [PATCH 2/2] Introduce Influxdb2Writer refs #8711 --- doc/06-distributed-monitoring.md | 2 +- doc/09-object-types.md | 63 +++++++++++++++++++ doc/13-addons.md | 7 +++ doc/14-features.md | 20 ++++-- etc/icinga2/features-available/influxdb2.conf | 27 ++++++++ lib/perfdata/CMakeLists.txt | 7 +++ lib/perfdata/influxdb2writer.cpp | 44 +++++++++++++ lib/perfdata/influxdb2writer.hpp | 33 ++++++++++ lib/perfdata/influxdb2writer.ti | 19 ++++++ tools/syntax/vim/syntax/icinga2.vim | 2 +- 10 files changed, 216 insertions(+), 8 deletions(-) create mode 100644 etc/icinga2/features-available/influxdb2.conf create mode 100644 lib/perfdata/influxdb2writer.cpp create mode 100644 lib/perfdata/influxdb2writer.hpp create mode 100644 lib/perfdata/influxdb2writer.ti diff --git a/doc/06-distributed-monitoring.md b/doc/06-distributed-monitoring.md index 1531451ba..ca47bccb5 100644 --- a/doc/06-distributed-monitoring.md +++ b/doc/06-distributed-monitoring.md @@ -3001,7 +3001,7 @@ By default, the following features provide advanced HA functionality: * [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter) * [Gelf](09-object-types.md#objecttype-gelfwriter) * [Graphite](09-object-types.md#objecttype-graphitewriter) -* [InfluxDB](09-object-types.md#objecttype-influxdbwriter) +* [InfluxDB](09-object-types.md#objecttype-influxdb2writer) (v1 and v2) * [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter) * [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP) diff --git a/doc/09-object-types.md b/doc/09-object-types.md index b87eff813..19e796aff 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1606,6 +1606,7 @@ Runtime Attributes: Writes check result metrics and performance data to a defined InfluxDB v1 host. This configuration object is available as [influxdb feature](14-features.md#influxdb-writer). +For InfluxDB v2 support see the [Influxdb2Writer](#objecttype-influxdb2writer) below. Example: @@ -1669,6 +1670,68 @@ or similar. +### Influxdb2Writer + +Writes check result metrics and performance data to a defined InfluxDB v2 host. +This configuration object is available as [influxdb feature](14-features.md#influxdb-writer). +For InfluxDB v1 support see the [InfluxdbWriter](#objecttype-influxdbwriter) above. + +Example: + +``` +object Influxdb2Writer "influxdb2" { + host = "127.0.0.1" + port = 8086 + organization = "monitoring" + bucket = "icinga2" + auth_token = "ABCDEvwxyz0189-_" + + flush_threshold = 1024 + flush_interval = 10s + + host_template = { + measurement = "$host.check_command$" + tags = { + hostname = "$host.name$" + } + } + service_template = { + measurement = "$service.check_command$" + tags = { + hostname = "$host.name$" + service = "$service.name$" + } + } +} +``` + +Configuration Attributes: + + Name | Type | Description + --------------------------|-----------------------|---------------------------------- + host | String | **Required.** InfluxDB host address. Defaults to `127.0.0.1`. + port | Number | **Required.** InfluxDB HTTP port. Defaults to `8086`. + organization | String | **Required.** InfluxDB organization name. + bucket | String | **Required.** InfluxDB bucket name. + auth\_token | String | **Required.** InfluxDB authentication token. + ssl\_enable | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`. + ssl\_ca\_cert | String | **Optional.** Path to CA certificate to validate the remote host. + ssl\_cert | String | **Optional.** Path to host certificate to present to the remote host for mutual verification. + ssl\_key | String | **Optional.** Path to host key to accompany the ssl\_cert. + host\_template | Dictionary | **Required.** Host template to define the InfluxDB line protocol. + service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. + enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. + enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. + flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. + flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. + enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. + +Note: If `flush_threshold` is set too low, this will always force the feature to flush all data +to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second +or similar. + + + ### LiveStatusListener Livestatus API interface available as TCP or UNIX socket. Historical table queries diff --git a/doc/13-addons.md b/doc/13-addons.md index fd65b4b02..e212c3734 100644 --- a/doc/13-addons.md +++ b/doc/13-addons.md @@ -63,6 +63,13 @@ for sending real-time metrics from Icinga 2 to InfluxDB v1. icinga2 feature enable influxdb ``` +Use the [Influxdb2Writer](14-features.md#influxdb-writer) feature +for sending real-time metrics from Icinga 2 to InfluxDB v2. + +```bash +icinga2 feature enable influxdb2 +``` + A popular frontend for InfluxDB is for example [Grafana](https://grafana.org). Integration in Icinga Web 2 is possible by installing the community [Grafana module](https://github.com/Mikesch-mp/icingaweb2-module-grafana). diff --git a/doc/14-features.md b/doc/14-features.md index 10656331a..5e5eea0a7 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -369,7 +369,7 @@ where Carbon Cache/Relay is running as receiver. ### InfluxDB Writer Once there are new metrics available, Icinga 2 will directly write them to the -defined InfluxDB v1 HTTP API. +defined InfluxDB v1/v2 HTTP API. You can enable the feature using @@ -377,10 +377,17 @@ You can enable the feature using icinga2 feature enable influxdb ``` -By default the [InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter) feature -expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`. +or -Measurement names and tags are fully configurable by the end user. The InfluxdbWriter +```bash +icinga2 feature enable influxdb2 +``` + +By default the +[InfluxdbWriter](09-object-types.md#objecttype-influxdbwriter)/[Influxdb2Writer](09-object-types.md#objecttype-influxdb2writer) +features expect the InfluxDB daemon to listen at `127.0.0.1` on port `8086`. + +Measurement names and tags are fully configurable by the end user. The Influxdb(2)Writer object will automatically add a `metric` tag to each data point. This correlates to the perfdata label. Fields (value, warn, crit, min, max, unit) are created from data if available and the configuration allows it. If a value associated with a tag is not able to be @@ -391,12 +398,13 @@ escape characters when followed by a space or comma, but cannot be escaped thems As a result all trailling slashes in these fields are replaced with an underscore. This predominantly affects Windows paths e.g. `C:\` becomes `C:_`. -The database is assumed to exist so this object will make no attempt to create it currently. +The database/bucket is assumed to exist so this object will make no attempt to create it currently. If [SELinux](22-selinux.md#selinux) is enabled, it will not allow access for Icinga 2 to InfluxDB until the [boolean](22-selinux.md#selinux-policy-booleans) `icinga2_can_connect_all` is set to true as InfluxDB is not providing its own policy. -More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter). +More configuration details can be found [here for v1](09-object-types.md#objecttype-influxdbwriter) +and [here for v2](09-object-types.md#objecttype-influxdb2writer). #### Instance Tagging diff --git a/etc/icinga2/features-available/influxdb2.conf b/etc/icinga2/features-available/influxdb2.conf new file mode 100644 index 000000000..53f7a217b --- /dev/null +++ b/etc/icinga2/features-available/influxdb2.conf @@ -0,0 +1,27 @@ +/** + * The Influxdb2Writer type writes check result metrics and + * performance data to an InfluxDB v2 HTTP API + */ + +object Influxdb2Writer "influxdb2" { + //host = "127.0.0.1" + //port = 8086 + //organization = "monitoring" + //bucket = "icinga2" + //auth_token = "ABCDEvwxyz0189-_" + //flush_threshold = 1024 + //flush_interval = 10s + //host_template = { + // measurement = "$host.check_command$" + // tags = { + // hostname = "$host.name$" + // } + //} + //service_template = { + // measurement = "$service.check_command$" + // tags = { + // hostname = "$host.name$" + // service = "$service.name$" + // } + //} +} diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 0e54ec832..168938c42 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -4,6 +4,7 @@ mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp) mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp) mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp) mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp) +mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp) mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp) mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp) mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp) @@ -14,6 +15,7 @@ set(perfdata_SOURCES graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp + influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp ) @@ -46,6 +48,11 @@ install_if_not_exists( ${ICINGA2_CONFIGDIR}/features-available ) +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb2.conf + ${ICINGA2_CONFIGDIR}/features-available +) + install_if_not_exists( ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf ${ICINGA2_CONFIGDIR}/features-available diff --git a/lib/perfdata/influxdb2writer.cpp b/lib/perfdata/influxdb2writer.cpp new file mode 100644 index 000000000..57fc94e23 --- /dev/null +++ b/lib/perfdata/influxdb2writer.cpp @@ -0,0 +1,44 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdb2writer.hpp" +#include "perfdata/influxdb2writer-ti.cpp" +#include "remote/url.hpp" +#include "base/configtype.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" +#include +#include +#include + +using namespace icinga; + +REGISTER_TYPE(Influxdb2Writer); + +REGISTER_STATSFUNCTION(Influxdb2Writer, &Influxdb2Writer::StatsFunc); + +void Influxdb2Writer::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + InfluxdbCommonWriter::StatsFunc(status, perfdata); +} + +boost::beast::http::request Influxdb2Writer::AssembleRequest(String body) +{ + auto request (AssembleBaseRequest(std::move(body))); + + request.set(boost::beast::http::field::authorization, "Token " + GetAuthToken()); + + return std::move(request); +} + +Url::Ptr Influxdb2Writer::AssembleUrl() +{ + auto url (AssembleBaseUrl()); + + std::vector path ({"api", "v2", "write"}); + url->SetPath(path); + + url->AddQueryElement("org", GetOrganization()); + url->AddQueryElement("bucket", GetBucket()); + + return std::move(url); +} diff --git a/lib/perfdata/influxdb2writer.hpp b/lib/perfdata/influxdb2writer.hpp new file mode 100644 index 000000000..3b20f8b9d --- /dev/null +++ b/lib/perfdata/influxdb2writer.hpp @@ -0,0 +1,33 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#ifndef INFLUXDB2WRITER_H +#define INFLUXDB2WRITER_H + +#include "perfdata/influxdb2writer-ti.hpp" +#include +#include + +namespace icinga +{ + +/** + * An Icinga InfluxDB v2 writer. + * + * @ingroup perfdata + */ +class Influxdb2Writer final : public ObjectImpl +{ +public: + DECLARE_OBJECT(Influxdb2Writer); + DECLARE_OBJECTNAME(Influxdb2Writer); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + +protected: + boost::beast::http::request AssembleRequest(String body) override; + Url::Ptr AssembleUrl() override; +}; + +} + +#endif /* INFLUXDB2WRITER_H */ diff --git a/lib/perfdata/influxdb2writer.ti b/lib/perfdata/influxdb2writer.ti new file mode 100644 index 000000000..f80618755 --- /dev/null +++ b/lib/perfdata/influxdb2writer.ti @@ -0,0 +1,19 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbcommonwriter.hpp" + +library perfdata; + +namespace icinga +{ + +class Influxdb2Writer : InfluxdbCommonWriter +{ + activation_priority 100; + + [config, required] String organization; + [config, required] String bucket; + [config, required, no_user_view] String auth_token; +}; + +} diff --git a/tools/syntax/vim/syntax/icinga2.vim b/tools/syntax/vim/syntax/icinga2.vim index 59ff202f1..3cbb8e632 100644 --- a/tools/syntax/vim/syntax/icinga2.vim +++ b/tools/syntax/vim/syntax/icinga2.vim @@ -57,7 +57,7 @@ syn keyword icinga2ObjType Comment Dependency Downtime ElasticsearchWriter syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection -syn keyword icinga2ObjType InfluxdbWriter LivestatusListener Notification NotificationCommand +syn keyword icinga2ObjType InfluxdbWriter Influxdb2Writer LivestatusListener Notification NotificationCommand syn keyword icinga2ObjType NotificationComponent OpenTsdbWriter PerfdataWriter syn keyword icinga2ObjType ScheduledDowntime Service ServiceGroup SyslogLogger syn keyword icinga2ObjType TimePeriod User UserGroup WindowsEventLogLogger Zone