Add ElasticWriter

fixes #5538
This commit is contained in:
Jean Flach 2017-09-07 15:11:57 +02:00
parent de51966f52
commit 651379db6f
7 changed files with 767 additions and 9 deletions

View File

@ -969,6 +969,41 @@ is associated with the service:
...
}
## ElasticWriter <a id="objecttype-elasticwriter"></a>
Writes check result metrics and performance data to an Elasticsearch instance.
Example:
library "perfdata"
object ElasticWriter "elastic" {
host = "127.0.0.1"
port = 9200
index = "icinga2"
enable_send_perfdata = true
flush_threshold = 1024
flush_interval = 10
}
The index is rotated daily, as is recommended by Elastic, meaning the index will be renamed to `$index-$d.$M.$y`.
Configuration Attributes:
Name |Description
-----------------------|---------------------------------------------------------------------------------------------------------
host | **Required.** Elasticsearch host address. Defaults to `127.0.0.1`.
port | **Required.** Elasticsearch port. Defaults to `9200`.
index | **Required.** Elasticsearch index name. Defaults to `icinga2`.
enable_send_perfdata | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
flush_interval | **Optional.** How long to buffer data points before transfering to Elasticsearch. Defaults to `10`.
flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`.
Note: If `flush_threshold` is set too low, this will force the feature to flush all data to Elasticsearch too often.
Experiment with the setting, if you are processing more than 1024 metrics per second or similar.
## LiveStatusListener <a id="objecttype-livestatuslistener"></a>
Livestatus API interface available as TCP or UNIX socket. Historical table queries

View File

@ -264,6 +264,74 @@ expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter).
### Elastic Stack Integration <a id="elastic-stack-integration"></a>
[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data
from the Icinga 2 API and sends it either directly to [Elasticsearch](https://www.elastic.co/products/elasticsearch)
or [Logstash](https://www.elastic.co/products/logstash).
More integrations:
* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
#### Elastic Writer <a id="elastic-writer"></a>
This feature forwards check results, state changes and notification events
to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API.
The check results include parsed performance data metrics if enabled.
> **Note**
>
> Elasticsearch 5.x+ is required.
Enable the feature and restart Icinga 2.
# icinga2 feature enable elastic
The default configuration expects an Elasticsearch instance running on `localhost` on port `9200
and writes to an index called `icinga2`.
More configuration details can be found [here](09-object-types.md#objecttype-elasticwriter).
#### Current Elasticsearch Schema <a id="elastic-writer-schema"></a>
The following event types are written to Elasticsearch:
* icinga2.event.checkresult
* icinga2.event.statechange
* icinga2.event.notification
Performance data metrics must be explicitly enabled with the `enable_send_perfdata`
attribute.
Metric values are stored like this:
check_result.perfdata.<perfdata-label>.value
The following characters are escaped in perfdata labels:
Character | Escaped character
--------------|--------------------------
whitespace | _
\ | _
/ | _
:: | .
Note that perfdata labels may contain dots (`.`) allowing to
add more subsequent levels inside the tree.
`::` adds support for [multi performance labels](http://my-plugin.de/wiki/projects/check_multi/configuration/performance)
and is therefore replaced by `.`.
Icinga 2 automatically adds the following threshold metrics
if existing:
check_result.perfdata.<perfdata-label>.min
check_result.perfdata.<perfdata-label>.max
check_result.perfdata.<perfdata-label>.warn
check_result.perfdata.<perfdata-label>.crit
### Graylog Integration <a id="graylog-integration"></a>
#### GELF Writer <a id="gelfwriter"></a>
@ -288,14 +356,6 @@ Currently these events are processed:
* State changes
* Notifications
### Elastic Stack Integration <a id="elastic-stack-integration"></a>
[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data
from the Icinga 2 API and sends it either directly to Elasticsearch or Logstash.
More integrations in development:
* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
### OpenTSDB Writer <a id="opentsdb-writer"></a>

View File

@ -0,0 +1,10 @@
library "perfdata"
object ElasticWriter "elastic" {
//host = "127.0.0.1"
//port = 9200
//index = "icinga2"
//send_enable_perfdata = false
//flush_threshold = 1024
//flush_interval = 10s
}

View File

@ -18,11 +18,12 @@
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
mkclass_target(elasticwriter.ti elasticwriter.tcpp elasticwriter.thpp)
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
set(perfdata_SOURCES
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticwriter.cpp elasticwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
)
if(ICINGA2_UNITY_BUILD)
@ -56,6 +57,11 @@ install_if_not_exists(
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
)
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elastic.conf
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
)
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available

View File

@ -0,0 +1,535 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "perfdata/elasticwriter.hpp"
#include "perfdata/elasticwriter.tcpp"
#include "remote/url.hpp"
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"
#include "base/tcpsocket.hpp"
#include "base/stream.hpp"
#include "base/json.hpp"
#include "base/utility.hpp"
#include "base/networkstream.hpp"
#include "base/perfdatavalue.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp>
using namespace icinga;
REGISTER_TYPE(ElasticWriter);
REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc);
ElasticWriter::ElasticWriter(void)
: m_WorkQueue(10000000, 1)
{ }
void ElasticWriter::OnConfigLoaded(void)
{
ObjectImpl<ElasticWriter>::OnConfigLoaded();
m_WorkQueue.SetName("ElasticWriter, " + GetName());
}
void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType<ElasticWriter>()) {
size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength();
double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
nodes->Set(elasticwriter->GetName(), stats);
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("elasticwriter", nodes);
}
void ElasticWriter::Start(bool runtimeCreated)
{
ObjectImpl<ElasticWriter>::Start(runtimeCreated);
m_EventPrefix = "icinga2.event.";
Log(LogInformation, "ElasticWriter")
<< "'" << GetName() << "' started.";
m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1));
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this));
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Register for new metrics. */
Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2));
Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3));
Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
}
void ElasticWriter::Stop(bool runtimeRemoved)
{
Log(LogInformation, "ElasticWriter")
<< "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
ObjectImpl<ElasticWriter>::Stop(runtimeRemoved);
}
void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
String prefix = "check_result.";
fields->Set(prefix + "output", cr->GetOutput());
fields->Set(prefix + "check_source", cr->GetCheckSource());
fields->Set(prefix + "exit_status", cr->GetExitStatus());
fields->Set(prefix + "command", cr->GetCommand());
fields->Set(prefix + "state", cr->GetState());
fields->Set(prefix + "vars_before", cr->GetVarsBefore());
fields->Set(prefix + "vars_after", cr->GetVarsAfter());
fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
/* Add extra calculated field. */
fields->Set(prefix + "latency", cr->CalculateLatency());
fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
if (!GetEnableSendPerfdata())
return;
Array::Ptr perfdata = cr->GetPerformanceData();
if (perfdata) {
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>())
pdv = val;
else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "ElasticWriter")
<< "Ignoring invalid perfdata value: '" << val << "' for object '"
<< checkable->GetName() << "'.";
}
}
String escapedKey = pdv->GetLabel();
boost::replace_all(escapedKey, " ", "_");
boost::replace_all(escapedKey, ".", "_");
boost::replace_all(escapedKey, "\\", "_");
boost::algorithm::replace_all(escapedKey, "::", ".");
String perfdataPrefix = prefix + "perfdata." + escapedKey;
fields->Set(perfdataPrefix + ".value", pdv->GetValue());
if (pdv->GetMin())
fields->Set(perfdataPrefix + ".min", pdv->GetMin());
if (pdv->GetMax())
fields->Set(perfdataPrefix + ".max", pdv->GetMax());
if (pdv->GetWarn())
fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
if (pdv->GetCrit())
fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
}
}
}
void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr));
}
void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;
Host::Ptr host;
Service::Ptr service;
boost::tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
if (service) {
fields->Set("service", service->GetShortName());
fields->Set("state", service->GetState());
fields->Set("last_state", service->GetLastState());
fields->Set("last_hard_state", service->GetLastHardState());
} else {
fields->Set("state", host->GetState());
fields->Set("last_state", host->GetLastState());
fields->Set("last_hard_state", host->GetLastHardState());
}
fields->Set("host", host->GetName());
fields->Set("state_type", checkable->GetStateType());
fields->Set("current_check_attempt", checkable->GetCheckAttempt());
fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
fields->Set("reachable", checkable->IsReachable());
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
Enqueue("checkresult", fields, ts);
}
void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}
void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Dictionary::Ptr fields = new Dictionary();
fields->Set("current_check_attempt", checkable->GetCheckAttempt());
fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
fields->Set("host", host->GetName());
if (service) {
fields->Set("service", service->GetShortName());
fields->Set("state", service->GetState());
fields->Set("last_state", service->GetLastState());
fields->Set("last_hard_state", service->GetLastHardState());
} else {
fields->Set("state", host->GetState());
fields->Set("last_state", host->GetLastState());
fields->Set("last_hard_state", host->GetLastHardState());
}
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
Enqueue("statechange", fields, ts);
}
void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this,
notification, checkable, users, type, cr, author, text));
}
void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
AssertOnWorkQueue();
CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
Log(LogDebug, "ElasticWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
String notificationTypeString = Notification::NotificationTypeToString(type);
Dictionary::Ptr fields = new Dictionary();
if (service) {
fields->Set("service", service->GetShortName());
fields->Set("state", service->GetState());
fields->Set("last_state", service->GetLastState());
fields->Set("last_hard_state", service->GetLastHardState());
} else {
fields->Set("state", host->GetState());
fields->Set("last_state", host->GetLastState());
fields->Set("last_hard_state", host->GetLastHardState());
}
fields->Set("host", host->GetName());
Array::Ptr userNames = new Array();
for (const User::Ptr& user : users) {
userNames->Add(user->GetName());
}
fields->Set("users", userNames);
fields->Set("notification_type", notificationTypeString);
fields->Set("author", author);
fields->Set("text", text);
CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
if (commandObj)
fields->Set("check_command", commandObj->GetName());
double ts = Utility::GetTime();
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
Enqueue("notification", fields, ts);
}
void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
{
/* Atomically buffer the data point. */
boost::mutex::scoped_lock lock(m_DataBufferMutex);
/* Format the timestamps to dynamically select the date datatype inside the index. */
fields->Set("@timestamp", FormatTimestamp(ts));
fields->Set("timestamp", FormatTimestamp(ts));
String eventType = m_EventPrefix + type;
fields->Set("type", eventType);
/* Every payload needs a line describing the index above.
* We do it this way to avoid problems with a near full queue.
*/
String data;
data += "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
data += JsonEncode(fields);
m_DataBuffer.push_back(data);
/* Flush if we've buffered too much to prevent excessive memory use. */
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
Log(LogDebug, "ElasticWriter")
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void ElasticWriter::FlushTimeout(void)
{
/* Prevent new data points from being added to the array, there is a
* race condition where they could disappear.
*/
boost::mutex::scoped_lock lock(m_DataBufferMutex);
/* Flush if there are any data available. */
if (m_DataBuffer.size() > 0) {
Log(LogDebug, "ElasticWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void ElasticWriter::Flush(void)
{
/* Ensure you hold a lock against m_DataBuffer so that things
* don't go missing after creating the body and clearing the buffer.
*/
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
SendRequest(body);
}
void ElasticWriter::SendRequest(const String& body)
{
Url::Ptr url = new Url();
url->SetScheme("http");
url->SetHost(GetHost());
url->SetPort(GetPort());
std::vector<String> path;
/* Specify the index path. Best practice is a daily rotation.
* Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
*/
path.push_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
/* Use the bulk message format. */
path.push_back("_bulk");
url->SetPath(path);
Stream::Ptr stream = Connect();
HttpRequest req(stream);
/* Specify required headers by Elasticsearch. */
req.AddHeader("Accept", "application/json");
req.AddHeader("Content-Type", "application/json");
req.RequestMethod = "POST";
req.RequestUrl = url;
#ifdef I2_DEBUG /* I2_DEBUG */
Log(LogDebug, "ElasticWriter")
<< "Sending body: " << body;
#endif /* I2_DEBUG */
try {
req.WriteBody(body.CStr(), body.GetLength());
req.Finish();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
HttpResponse resp(stream, req);
StreamReadContext context;
try {
resp.Parse(context, true);
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
<< "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
if (resp.StatusCode > 299) {
Log(LogWarning, "ElasticWriter")
<< "Unexpected response code " << resp.StatusCode;
/* Finish parsing the headers and body. */
while (!resp.Complete)
resp.Parse(context, true);
String contentType = resp.Headers->Get("content-type");
if (contentType != "application/json") {
Log(LogWarning, "ElasticWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}
size_t responseSize = resp.GetBodySize();
boost::scoped_array<char> buffer(new char[responseSize + 1]);
resp.ReadBody(buffer.get(), responseSize);
buffer.get()[responseSize] = '\0';
Dictionary::Ptr jsonResponse;
try {
jsonResponse = JsonDecode(buffer.get());
} catch (...) {
Log(LogWarning, "ElasticWriter")
<< "Unable to parse JSON response:\n" << buffer.get();
return;
}
String error = jsonResponse->Get("error");
Log(LogCritical, "ElasticWriter")
<< "Elasticsearch error message:\n" << error;
}
}
Stream::Ptr ElasticWriter::Connect(void)
{
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "ElasticWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
return new NetworkStream(socket);
}
void ElasticWriter::AssertOnWorkQueue(void)
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void ElasticWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!");
Log(LogDebug, "ElasticWriter")
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
}
String ElasticWriter::FormatTimestamp(double ts)
{
/* The date format must match the default dynamic date detection
* pattern in indexes. This enables applications like Kibana to
* detect a qualified timestamp index for time-series data.
*
* Example: 2017-09-11T10:56:21.463+0200
*
* References:
* https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
* https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
* https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
*/
int milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + String(milliSeconds) + Utility::FormatDateTime("%z", ts);
}

View File

@ -0,0 +1,81 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef ELASTICWRITER_H
#define ELASTICWRITER_H
#include "perfdata/elasticwriter.thpp"
#include "icinga/service.hpp"
#include "base/configobject.hpp"
#include "base/workqueue.hpp"
#include "base/timer.hpp"
namespace icinga
{
class ElasticWriter : public ObjectImpl<ElasticWriter>
{
public:
DECLARE_OBJECT(ElasticWriter);
DECLARE_OBJECTNAME(ElasticWriter);
ElasticWriter(void);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
static String FormatTimestamp(double ts);
protected:
virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
String m_EventPrefix;
WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
boost::mutex m_DataBufferMutex;
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void Enqueue(String type, const Dictionary::Ptr& fields, double ts);
Stream::Ptr Connect(void);
void AssertOnWorkQueue(void);
void ExceptionHandler(boost::exception_ptr exp);
void FlushTimeout(void);
void Flush(void);
void SendRequest(const String& body);
};
}
#endif /* ELASTICWRITER_H */

View File

@ -0,0 +1,31 @@
#include "base/configobject.hpp"
library perfdata;
namespace icinga
{
class ElasticWriter : ConfigObject
{
[config, required] String host {
default {{{ return "127.0.0.1"; }}}
};
[config, required] String port {
default {{{ return "9200"; }}}
};
[config, required] String index {
default {{{ return "icinga2"; }}}
};
[config] bool enable_send_perfdata {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 10; }}}
};
[config] int flush_threshold {
default {{{ return 1024; }}}
};
};
}