Rename ElasticWriter to ElasticsearchWriter

This better reflects its purpose as otherwise it would imply
that you need Elastic Stack for it. Graylog also reads from
Elasticsearch instances, this could serve as additional integration
here.
This commit is contained in:
Michael Friedrich 2017-11-09 14:05:10 +01:00
parent d69c73bbc8
commit 7c0a09cfb6
7 changed files with 84 additions and 84 deletions

View File

@ -470,17 +470,17 @@ Runtime Attributes:
triggered\_by | Object name | The name of the downtime this downtime was triggered by.
## ElasticWriter <a id="objecttype-elasticwriter"></a>
## ElasticsearchWriter <a id="objecttype-elasticsearchwriter"></a>
Writes check result metrics and performance data to an Elasticsearch instance.
This configuration object is available as [elastic feature](14-features.md#elastic-writer).
This configuration object is available as [elasticsearch feature](14-features.md#elasticsearch-writer).
Example:
```
library "perfdata"
object ElasticWriter "elastic" {
object ElasticsearchWriter "elasticsearch" {
host = "127.0.0.1"
port = 9200
index = "icinga2"

View File

@ -338,7 +338,7 @@ More integrations:
* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
#### Elastic Writer <a id="elastic-writer"></a>
#### Elasticsearch Writer <a id="elasticsearch-writer"></a>
This feature forwards check results, state changes and notification events
to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API.
@ -352,13 +352,13 @@ The check results include parsed performance data metrics if enabled.
Enable the feature and restart Icinga 2.
```
# icinga2 feature enable elastic
# icinga2 feature enable elasticsearch
```
The default configuration expects an Elasticsearch instance running on `localhost` on port `9200
and writes to an index called `icinga2`.
More configuration details can be found [here](09-object-types.md#objecttype-elasticwriter).
More configuration details can be found [here](09-object-types.md#objecttype-elasticsearchwriter).
#### Current Elasticsearch Schema <a id="elastic-writer-schema"></a>

View File

@ -1,6 +1,6 @@
library "perfdata"
object ElasticWriter "elastic" {
object ElasticsearchWriter "elasticsearch" {
//host = "127.0.0.1"
//port = 9200
//index = "icinga2"

View File

@ -18,12 +18,12 @@
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
mkclass_target(elasticwriter.ti elasticwriter.tcpp elasticwriter.thpp)
mkclass_target(elasticsearchwriter.ti elasticsearchwriter.tcpp elasticsearchwriter.thpp)
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
set(perfdata_SOURCES
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticwriter.cpp elasticwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticsearchwriter.cpp elasticsearchwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
)
if(ICINGA2_UNITY_BUILD)
@ -58,7 +58,7 @@ install_if_not_exists(
)
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elastic.conf
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
)

View File

@ -17,8 +17,8 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "perfdata/elasticwriter.hpp"
#include "perfdata/elasticwriter.tcpp"
#include "perfdata/elasticsearchwriter.hpp"
#include "perfdata/elasticsearchwriter.tcpp"
#include "remote/url.hpp"
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
@ -38,77 +38,77 @@
using namespace icinga;
REGISTER_TYPE(ElasticWriter);
REGISTER_TYPE(ElasticsearchWriter);
REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc);
REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
ElasticWriter::ElasticWriter(void)
ElasticsearchWriter::ElasticsearchWriter(void)
: m_WorkQueue(10000000, 1)
{ }
void ElasticWriter::OnConfigLoaded(void)
void ElasticsearchWriter::OnConfigLoaded(void)
{
ObjectImpl<ElasticWriter>::OnConfigLoaded();
ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
m_WorkQueue.SetName("ElasticWriter, " + GetName());
m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
}
void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType<ElasticWriter>()) {
size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength();
double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
nodes->Set(elasticwriter->GetName(), stats);
nodes->Set(elasticsearchwriter->GetName(), stats);
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("elasticwriter", nodes);
status->Set("elasticsearchwriter", nodes);
}
void ElasticWriter::Start(bool runtimeCreated)
void ElasticsearchWriter::Start(bool runtimeCreated)
{
ObjectImpl<ElasticWriter>::Start(runtimeCreated);
ObjectImpl<ElasticsearchWriter>::Start(runtimeCreated);
m_EventPrefix = "icinga2.event.";
Log(LogInformation, "ElasticWriter")
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' started.";
m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1));
m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this));
m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticsearchWriter::FlushTimeout, this));
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Register for new metrics. */
Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2));
Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3));
Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
Checkable::OnNewCheckResult.connect(boost::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
Checkable::OnStateChange.connect(boost::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
}
void ElasticWriter::Stop(bool runtimeRemoved)
void ElasticsearchWriter::Stop(bool runtimeRemoved)
{
Log(LogInformation, "ElasticWriter")
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
ObjectImpl<ElasticWriter>::Stop(runtimeRemoved);
ObjectImpl<ElasticsearchWriter>::Stop(runtimeRemoved);
}
void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
String prefix = "check_result.";
@ -145,7 +145,7 @@ void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkabl
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Ignoring invalid perfdata value: '" << val << "' for object '"
<< checkable->GetName() << "'.";
}
@ -173,12 +173,12 @@ void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkabl
}
}
void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr));
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
}
void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
@ -227,12 +227,12 @@ void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable,
Enqueue("checkresult", fields, ts);
}
void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type));
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}
void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
@ -274,15 +274,15 @@ void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable,
Enqueue("statechange", fields, ts);
}
void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this,
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
notification, checkable, users, type, cr, author, text));
}
void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
@ -290,7 +290,7 @@ void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification
CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
@ -340,7 +340,7 @@ void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification
Enqueue("notification", fields, ts);
}
void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
void ElasticsearchWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
{
/* Atomically buffer the data point. */
boost::mutex::scoped_lock lock(m_DataBufferMutex);
@ -359,20 +359,20 @@ void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double t
String indexBody = "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
String fieldsBody = JsonEncode(fields);
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Add to fields to message list: '" << fieldsBody << "'.";
m_DataBuffer.push_back(indexBody + fieldsBody);
/* Flush if we've buffered too much to prevent excessive memory use. */
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void ElasticWriter::FlushTimeout(void)
void ElasticsearchWriter::FlushTimeout(void)
{
/* Prevent new data points from being added to the array, there is a
* race condition where they could disappear.
@ -381,13 +381,13 @@ void ElasticWriter::FlushTimeout(void)
/* Flush if there are any data available. */
if (m_DataBuffer.size() > 0) {
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void ElasticWriter::Flush(void)
void ElasticsearchWriter::Flush(void)
{
/* Ensure you hold a lock against m_DataBuffer so that things
* don't go missing after creating the body and clearing the buffer.
@ -398,7 +398,7 @@ void ElasticWriter::Flush(void)
SendRequest(body);
}
void ElasticWriter::SendRequest(const String& body)
void ElasticsearchWriter::SendRequest(const String& body)
{
Url::Ptr url = new Url();
@ -436,7 +436,7 @@ void ElasticWriter::SendRequest(const String& body)
req.RequestUrl = url;
/* Don't log the request body to debug log, this is already done above. */
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'.";
@ -444,7 +444,7 @@ void ElasticWriter::SendRequest(const String& body)
req.WriteBody(body.CStr(), body.GetLength());
req.Finish();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
@ -455,7 +455,7 @@ void ElasticWriter::SendRequest(const String& body)
try {
resp.Parse(context, true);
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
@ -464,18 +464,18 @@ void ElasticWriter::SendRequest(const String& body)
if (resp.StatusCode == 401) {
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
if (!username.IsEmpty() && !password.IsEmpty()) {
Log(LogCritical, "ElasticWriter")
Log(LogCritical, "ElasticsearchWriter")
<< "401 Unauthorized. Please ensure that the user '" << username
<< "' is able to authenticate against the HTTP API/Proxy.";
} else {
Log(LogCritical, "ElasticWriter")
Log(LogCritical, "ElasticsearchWriter")
<< "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
}
return;
}
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Unexpected response code " << resp.StatusCode;
/* Finish parsing the headers and body. */
@ -485,7 +485,7 @@ void ElasticWriter::SendRequest(const String& body)
String contentType = resp.Headers->Get("content-type");
if (contentType != "application/json") {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}
@ -499,29 +499,29 @@ void ElasticWriter::SendRequest(const String& body)
try {
jsonResponse = JsonDecode(buffer.get());
} catch (...) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Unable to parse JSON response:\n" << buffer.get();
return;
}
String error = jsonResponse->Get("error");
Log(LogCritical, "ElasticWriter")
Log(LogCritical, "ElasticsearchWriter")
<< "Elasticsearch error message:\n" << error;
}
}
Stream::Ptr ElasticWriter::Connect(void)
Stream::Ptr ElasticsearchWriter::Connect(void)
{
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "ElasticWriter")
Log(LogNotice, "ElasticsearchWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
@ -532,7 +532,7 @@ Stream::Ptr ElasticWriter::Connect(void)
try {
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "Unable to create SSL context.";
throw ex;
}
@ -542,7 +542,7 @@ Stream::Ptr ElasticWriter::Connect(void)
try {
tlsStream->Handshake();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticWriter")
Log(LogWarning, "ElasticsearchWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw ex;
}
@ -553,20 +553,20 @@ Stream::Ptr ElasticWriter::Connect(void)
}
}
void ElasticWriter::AssertOnWorkQueue(void)
void ElasticsearchWriter::AssertOnWorkQueue(void)
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void ElasticWriter::ExceptionHandler(boost::exception_ptr exp)
void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!");
Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
Log(LogDebug, "ElasticWriter")
Log(LogDebug, "ElasticsearchWriter")
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
}
String ElasticWriter::FormatTimestamp(double ts)
String ElasticsearchWriter::FormatTimestamp(double ts)
{
/* The date format must match the default dynamic date detection
* pattern in indexes. This enables applications like Kibana to

View File

@ -17,10 +17,10 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef ELASTICWRITER_H
#define ELASTICWRITER_H
#ifndef ELASTICSEARCHWRITER_H
#define ELASTICSEARCHWRITER_H
#include "perfdata/elasticwriter.thpp"
#include "perfdata/elasticsearchwriter.thpp"
#include "icinga/service.hpp"
#include "base/configobject.hpp"
#include "base/workqueue.hpp"
@ -29,13 +29,13 @@
namespace icinga
{
class ElasticWriter : public ObjectImpl<ElasticWriter>
class ElasticsearchWriter : public ObjectImpl<ElasticsearchWriter>
{
public:
DECLARE_OBJECT(ElasticWriter);
DECLARE_OBJECTNAME(ElasticWriter);
DECLARE_OBJECT(ElasticsearchWriter);
DECLARE_OBJECTNAME(ElasticsearchWriter);
ElasticWriter(void);
ElasticsearchWriter(void);
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
@ -78,4 +78,4 @@ private:
}
#endif /* ELASTICWRITER_H */
#endif /* ELASTICSEARCHWRITER_H */

View File

@ -5,7 +5,7 @@ library perfdata;
namespace icinga
{
class ElasticWriter : ConfigObject
class ElasticsearchWriter : ConfigObject
{
[config, required] String host {
default {{{ return "127.0.0.1"; }}}