mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 22:54:57 +02:00
Merge pull request #5739 from Icinga/feature/elasticsearchwriter
Rename ElasticWriter to ElasticsearchWriter
This commit is contained in:
commit
166ecfe446
@ -470,17 +470,17 @@ Runtime Attributes:
|
|||||||
triggered\_by | Object name | The name of the downtime this downtime was triggered by.
|
triggered\_by | Object name | The name of the downtime this downtime was triggered by.
|
||||||
|
|
||||||
|
|
||||||
## ElasticWriter <a id="objecttype-elasticwriter"></a>
|
## ElasticsearchWriter <a id="objecttype-elasticsearchwriter"></a>
|
||||||
|
|
||||||
Writes check result metrics and performance data to an Elasticsearch instance.
|
Writes check result metrics and performance data to an Elasticsearch instance.
|
||||||
This configuration object is available as [elastic feature](14-features.md#elastic-writer).
|
This configuration object is available as [elasticsearch feature](14-features.md#elasticsearch-writer).
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
```
|
```
|
||||||
library "perfdata"
|
library "perfdata"
|
||||||
|
|
||||||
object ElasticWriter "elastic" {
|
object ElasticsearchWriter "elasticsearch" {
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
port = 9200
|
port = 9200
|
||||||
index = "icinga2"
|
index = "icinga2"
|
||||||
|
@ -338,7 +338,7 @@ More integrations:
|
|||||||
* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
|
* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
|
||||||
* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
|
* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
|
||||||
|
|
||||||
#### Elastic Writer <a id="elastic-writer"></a>
|
#### Elasticsearch Writer <a id="elasticsearch-writer"></a>
|
||||||
|
|
||||||
This feature forwards check results, state changes and notification events
|
This feature forwards check results, state changes and notification events
|
||||||
to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API.
|
to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API.
|
||||||
@ -352,13 +352,13 @@ The check results include parsed performance data metrics if enabled.
|
|||||||
Enable the feature and restart Icinga 2.
|
Enable the feature and restart Icinga 2.
|
||||||
|
|
||||||
```
|
```
|
||||||
# icinga2 feature enable elastic
|
# icinga2 feature enable elasticsearch
|
||||||
```
|
```
|
||||||
|
|
||||||
The default configuration expects an Elasticsearch instance running on `localhost` on port `9200
|
The default configuration expects an Elasticsearch instance running on `localhost` on port `9200
|
||||||
and writes to an index called `icinga2`.
|
and writes to an index called `icinga2`.
|
||||||
|
|
||||||
More configuration details can be found [here](09-object-types.md#objecttype-elasticwriter).
|
More configuration details can be found [here](09-object-types.md#objecttype-elasticsearchwriter).
|
||||||
|
|
||||||
#### Current Elasticsearch Schema <a id="elastic-writer-schema"></a>
|
#### Current Elasticsearch Schema <a id="elastic-writer-schema"></a>
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
library "perfdata"
|
library "perfdata"
|
||||||
|
|
||||||
object ElasticWriter "elastic" {
|
object ElasticsearchWriter "elasticsearch" {
|
||||||
//host = "127.0.0.1"
|
//host = "127.0.0.1"
|
||||||
//port = 9200
|
//port = 9200
|
||||||
//index = "icinga2"
|
//index = "icinga2"
|
@ -18,12 +18,12 @@
|
|||||||
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
|
mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
|
||||||
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
|
mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
|
||||||
mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
|
mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
|
||||||
mkclass_target(elasticwriter.ti elasticwriter.tcpp elasticwriter.thpp)
|
mkclass_target(elasticsearchwriter.ti elasticsearchwriter.tcpp elasticsearchwriter.thpp)
|
||||||
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
|
mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
|
||||||
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
|
mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
|
||||||
|
|
||||||
set(perfdata_SOURCES
|
set(perfdata_SOURCES
|
||||||
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticwriter.cpp elasticwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
|
gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticsearchwriter.cpp elasticsearchwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
|
||||||
)
|
)
|
||||||
|
|
||||||
if(ICINGA2_UNITY_BUILD)
|
if(ICINGA2_UNITY_BUILD)
|
||||||
@ -58,7 +58,7 @@ install_if_not_exists(
|
|||||||
)
|
)
|
||||||
|
|
||||||
install_if_not_exists(
|
install_if_not_exists(
|
||||||
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elastic.conf
|
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf
|
||||||
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,8 +17,8 @@
|
|||||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
#include "perfdata/elasticwriter.hpp"
|
#include "perfdata/elasticsearchwriter.hpp"
|
||||||
#include "perfdata/elasticwriter.tcpp"
|
#include "perfdata/elasticsearchwriter.tcpp"
|
||||||
#include "remote/url.hpp"
|
#include "remote/url.hpp"
|
||||||
#include "remote/httprequest.hpp"
|
#include "remote/httprequest.hpp"
|
||||||
#include "remote/httpresponse.hpp"
|
#include "remote/httpresponse.hpp"
|
||||||
@ -38,77 +38,77 @@
|
|||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
REGISTER_TYPE(ElasticWriter);
|
REGISTER_TYPE(ElasticsearchWriter);
|
||||||
|
|
||||||
REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc);
|
REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
|
||||||
|
|
||||||
ElasticWriter::ElasticWriter(void)
|
ElasticsearchWriter::ElasticsearchWriter(void)
|
||||||
: m_WorkQueue(10000000, 1)
|
: m_WorkQueue(10000000, 1)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
void ElasticWriter::OnConfigLoaded(void)
|
void ElasticsearchWriter::OnConfigLoaded(void)
|
||||||
{
|
{
|
||||||
ObjectImpl<ElasticWriter>::OnConfigLoaded();
|
ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
|
||||||
|
|
||||||
m_WorkQueue.SetName("ElasticWriter, " + GetName());
|
m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
||||||
{
|
{
|
||||||
Dictionary::Ptr nodes = new Dictionary();
|
Dictionary::Ptr nodes = new Dictionary();
|
||||||
|
|
||||||
for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType<ElasticWriter>()) {
|
for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
|
||||||
size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength();
|
size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
|
||||||
double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||||
|
|
||||||
Dictionary::Ptr stats = new Dictionary();
|
Dictionary::Ptr stats = new Dictionary();
|
||||||
stats->Set("work_queue_items", workQueueItems);
|
stats->Set("work_queue_items", workQueueItems);
|
||||||
stats->Set("work_queue_item_rate", workQueueItemRate);
|
stats->Set("work_queue_item_rate", workQueueItemRate);
|
||||||
|
|
||||||
nodes->Set(elasticwriter->GetName(), stats);
|
nodes->Set(elasticsearchwriter->GetName(), stats);
|
||||||
|
|
||||||
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems));
|
perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
|
||||||
perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
|
perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
|
||||||
}
|
}
|
||||||
|
|
||||||
status->Set("elasticwriter", nodes);
|
status->Set("elasticsearchwriter", nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::Start(bool runtimeCreated)
|
void ElasticsearchWriter::Start(bool runtimeCreated)
|
||||||
{
|
{
|
||||||
ObjectImpl<ElasticWriter>::Start(runtimeCreated);
|
ObjectImpl<ElasticsearchWriter>::Start(runtimeCreated);
|
||||||
|
|
||||||
m_EventPrefix = "icinga2.event.";
|
m_EventPrefix = "icinga2.event.";
|
||||||
|
|
||||||
Log(LogInformation, "ElasticWriter")
|
Log(LogInformation, "ElasticsearchWriter")
|
||||||
<< "'" << GetName() << "' started.";
|
<< "'" << GetName() << "' started.";
|
||||||
|
|
||||||
m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1));
|
m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
|
||||||
|
|
||||||
/* Setup timer for periodically flushing m_DataBuffer */
|
/* Setup timer for periodically flushing m_DataBuffer */
|
||||||
m_FlushTimer = new Timer();
|
m_FlushTimer = new Timer();
|
||||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||||
m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this));
|
m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticsearchWriter::FlushTimeout, this));
|
||||||
m_FlushTimer->Start();
|
m_FlushTimer->Start();
|
||||||
m_FlushTimer->Reschedule(0);
|
m_FlushTimer->Reschedule(0);
|
||||||
|
|
||||||
/* Register for new metrics. */
|
/* Register for new metrics. */
|
||||||
Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2));
|
Checkable::OnNewCheckResult.connect(boost::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
|
||||||
Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3));
|
Checkable::OnStateChange.connect(boost::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
|
||||||
Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::Stop(bool runtimeRemoved)
|
void ElasticsearchWriter::Stop(bool runtimeRemoved)
|
||||||
{
|
{
|
||||||
Log(LogInformation, "ElasticWriter")
|
Log(LogInformation, "ElasticsearchWriter")
|
||||||
<< "'" << GetName() << "' stopped.";
|
<< "'" << GetName() << "' stopped.";
|
||||||
|
|
||||||
m_WorkQueue.Join();
|
m_WorkQueue.Join();
|
||||||
|
|
||||||
ObjectImpl<ElasticWriter>::Stop(runtimeRemoved);
|
ObjectImpl<ElasticsearchWriter>::Stop(runtimeRemoved);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||||
{
|
{
|
||||||
String prefix = "check_result.";
|
String prefix = "check_result.";
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkabl
|
|||||||
try {
|
try {
|
||||||
pdv = PerfdataValue::Parse(val);
|
pdv = PerfdataValue::Parse(val);
|
||||||
} catch (const std::exception&) {
|
} catch (const std::exception&) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Ignoring invalid perfdata value: '" << val << "' for object '"
|
<< "Ignoring invalid perfdata value: '" << val << "' for object '"
|
||||||
<< checkable->GetName() << "'.";
|
<< checkable->GetName() << "'.";
|
||||||
}
|
}
|
||||||
@ -173,12 +173,12 @@ void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkabl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr));
|
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
@ -227,12 +227,12 @@ void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable,
|
|||||||
Enqueue("checkresult", fields, ts);
|
Enqueue("checkresult", fields, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type));
|
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
@ -274,15 +274,15 @@ void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable,
|
|||||||
Enqueue("statechange", fields, ts);
|
Enqueue("statechange", fields, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
|
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
|
||||||
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
|
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
|
||||||
const CheckResult::Ptr& cr, const String& author, const String& text)
|
const CheckResult::Ptr& cr, const String& author, const String& text)
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this,
|
m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
|
||||||
notification, checkable, users, type, cr, author, text));
|
notification, checkable, users, type, cr, author, text));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
|
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
|
||||||
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
|
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
|
||||||
const CheckResult::Ptr& cr, const String& author, const String& text)
|
const CheckResult::Ptr& cr, const String& author, const String& text)
|
||||||
{
|
{
|
||||||
@ -290,7 +290,7 @@ void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification
|
|||||||
|
|
||||||
CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
|
CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
|
||||||
|
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Processing notification for '" << checkable->GetName() << "'";
|
<< "Processing notification for '" << checkable->GetName() << "'";
|
||||||
|
|
||||||
Host::Ptr host;
|
Host::Ptr host;
|
||||||
@ -340,7 +340,7 @@ void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification
|
|||||||
Enqueue("notification", fields, ts);
|
Enqueue("notification", fields, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
|
void ElasticsearchWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
|
||||||
{
|
{
|
||||||
/* Atomically buffer the data point. */
|
/* Atomically buffer the data point. */
|
||||||
boost::mutex::scoped_lock lock(m_DataBufferMutex);
|
boost::mutex::scoped_lock lock(m_DataBufferMutex);
|
||||||
@ -359,20 +359,20 @@ void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double t
|
|||||||
String indexBody = "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
|
String indexBody = "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
|
||||||
String fieldsBody = JsonEncode(fields);
|
String fieldsBody = JsonEncode(fields);
|
||||||
|
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Add to fields to message list: '" << fieldsBody << "'.";
|
<< "Add to fields to message list: '" << fieldsBody << "'.";
|
||||||
|
|
||||||
m_DataBuffer.push_back(indexBody + fieldsBody);
|
m_DataBuffer.push_back(indexBody + fieldsBody);
|
||||||
|
|
||||||
/* Flush if we've buffered too much to prevent excessive memory use. */
|
/* Flush if we've buffered too much to prevent excessive memory use. */
|
||||||
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
|
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
|
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
|
||||||
Flush();
|
Flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::FlushTimeout(void)
|
void ElasticsearchWriter::FlushTimeout(void)
|
||||||
{
|
{
|
||||||
/* Prevent new data points from being added to the array, there is a
|
/* Prevent new data points from being added to the array, there is a
|
||||||
* race condition where they could disappear.
|
* race condition where they could disappear.
|
||||||
@ -381,13 +381,13 @@ void ElasticWriter::FlushTimeout(void)
|
|||||||
|
|
||||||
/* Flush if there are any data available. */
|
/* Flush if there are any data available. */
|
||||||
if (m_DataBuffer.size() > 0) {
|
if (m_DataBuffer.size() > 0) {
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||||
Flush();
|
Flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::Flush(void)
|
void ElasticsearchWriter::Flush(void)
|
||||||
{
|
{
|
||||||
/* Ensure you hold a lock against m_DataBuffer so that things
|
/* Ensure you hold a lock against m_DataBuffer so that things
|
||||||
* don't go missing after creating the body and clearing the buffer.
|
* don't go missing after creating the body and clearing the buffer.
|
||||||
@ -398,7 +398,7 @@ void ElasticWriter::Flush(void)
|
|||||||
SendRequest(body);
|
SendRequest(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::SendRequest(const String& body)
|
void ElasticsearchWriter::SendRequest(const String& body)
|
||||||
{
|
{
|
||||||
Url::Ptr url = new Url();
|
Url::Ptr url = new Url();
|
||||||
|
|
||||||
@ -436,7 +436,7 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
req.RequestUrl = url;
|
req.RequestUrl = url;
|
||||||
|
|
||||||
/* Don't log the request body to debug log, this is already done above. */
|
/* Don't log the request body to debug log, this is already done above. */
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
|
<< "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
|
||||||
<< " to '" << url->Format() << "'.";
|
<< " to '" << url->Format() << "'.";
|
||||||
|
|
||||||
@ -444,7 +444,7 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
req.WriteBody(body.CStr(), body.GetLength());
|
req.WriteBody(body.CStr(), body.GetLength());
|
||||||
req.Finish();
|
req.Finish();
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
@ -455,7 +455,7 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
try {
|
try {
|
||||||
resp.Parse(context, true);
|
resp.Parse(context, true);
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
@ -464,18 +464,18 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
if (resp.StatusCode == 401) {
|
if (resp.StatusCode == 401) {
|
||||||
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
|
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
|
||||||
if (!username.IsEmpty() && !password.IsEmpty()) {
|
if (!username.IsEmpty() && !password.IsEmpty()) {
|
||||||
Log(LogCritical, "ElasticWriter")
|
Log(LogCritical, "ElasticsearchWriter")
|
||||||
<< "401 Unauthorized. Please ensure that the user '" << username
|
<< "401 Unauthorized. Please ensure that the user '" << username
|
||||||
<< "' is able to authenticate against the HTTP API/Proxy.";
|
<< "' is able to authenticate against the HTTP API/Proxy.";
|
||||||
} else {
|
} else {
|
||||||
Log(LogCritical, "ElasticWriter")
|
Log(LogCritical, "ElasticsearchWriter")
|
||||||
<< "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
|
<< "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Unexpected response code " << resp.StatusCode;
|
<< "Unexpected response code " << resp.StatusCode;
|
||||||
|
|
||||||
/* Finish parsing the headers and body. */
|
/* Finish parsing the headers and body. */
|
||||||
@ -485,7 +485,7 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
String contentType = resp.Headers->Get("content-type");
|
String contentType = resp.Headers->Get("content-type");
|
||||||
|
|
||||||
if (contentType != "application/json") {
|
if (contentType != "application/json") {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Unexpected Content-Type: " << contentType;
|
<< "Unexpected Content-Type: " << contentType;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -499,29 +499,29 @@ void ElasticWriter::SendRequest(const String& body)
|
|||||||
try {
|
try {
|
||||||
jsonResponse = JsonDecode(buffer.get());
|
jsonResponse = JsonDecode(buffer.get());
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Unable to parse JSON response:\n" << buffer.get();
|
<< "Unable to parse JSON response:\n" << buffer.get();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String error = jsonResponse->Get("error");
|
String error = jsonResponse->Get("error");
|
||||||
|
|
||||||
Log(LogCritical, "ElasticWriter")
|
Log(LogCritical, "ElasticsearchWriter")
|
||||||
<< "Elasticsearch error message:\n" << error;
|
<< "Elasticsearch error message:\n" << error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream::Ptr ElasticWriter::Connect(void)
|
Stream::Ptr ElasticsearchWriter::Connect(void)
|
||||||
{
|
{
|
||||||
TcpSocket::Ptr socket = new TcpSocket();
|
TcpSocket::Ptr socket = new TcpSocket();
|
||||||
|
|
||||||
Log(LogNotice, "ElasticWriter")
|
Log(LogNotice, "ElasticsearchWriter")
|
||||||
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
socket->Connect(GetHost(), GetPort());
|
socket->Connect(GetHost(), GetPort());
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
@ -532,7 +532,7 @@ Stream::Ptr ElasticWriter::Connect(void)
|
|||||||
try {
|
try {
|
||||||
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "Unable to create SSL context.";
|
<< "Unable to create SSL context.";
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
@ -542,7 +542,7 @@ Stream::Ptr ElasticWriter::Connect(void)
|
|||||||
try {
|
try {
|
||||||
tlsStream->Handshake();
|
tlsStream->Handshake();
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ElasticWriter")
|
Log(LogWarning, "ElasticsearchWriter")
|
||||||
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
|
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
@ -553,20 +553,20 @@ Stream::Ptr ElasticWriter::Connect(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::AssertOnWorkQueue(void)
|
void ElasticsearchWriter::AssertOnWorkQueue(void)
|
||||||
{
|
{
|
||||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ElasticWriter::ExceptionHandler(boost::exception_ptr exp)
|
void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
|
||||||
{
|
{
|
||||||
Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!");
|
Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
|
||||||
|
|
||||||
Log(LogDebug, "ElasticWriter")
|
Log(LogDebug, "ElasticsearchWriter")
|
||||||
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
|
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
|
||||||
}
|
}
|
||||||
|
|
||||||
String ElasticWriter::FormatTimestamp(double ts)
|
String ElasticsearchWriter::FormatTimestamp(double ts)
|
||||||
{
|
{
|
||||||
/* The date format must match the default dynamic date detection
|
/* The date format must match the default dynamic date detection
|
||||||
* pattern in indexes. This enables applications like Kibana to
|
* pattern in indexes. This enables applications like Kibana to
|
@ -17,10 +17,10 @@
|
|||||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
#ifndef ELASTICWRITER_H
|
#ifndef ELASTICSEARCHWRITER_H
|
||||||
#define ELASTICWRITER_H
|
#define ELASTICSEARCHWRITER_H
|
||||||
|
|
||||||
#include "perfdata/elasticwriter.thpp"
|
#include "perfdata/elasticsearchwriter.thpp"
|
||||||
#include "icinga/service.hpp"
|
#include "icinga/service.hpp"
|
||||||
#include "base/configobject.hpp"
|
#include "base/configobject.hpp"
|
||||||
#include "base/workqueue.hpp"
|
#include "base/workqueue.hpp"
|
||||||
@ -29,13 +29,13 @@
|
|||||||
namespace icinga
|
namespace icinga
|
||||||
{
|
{
|
||||||
|
|
||||||
class ElasticWriter : public ObjectImpl<ElasticWriter>
|
class ElasticsearchWriter : public ObjectImpl<ElasticsearchWriter>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DECLARE_OBJECT(ElasticWriter);
|
DECLARE_OBJECT(ElasticsearchWriter);
|
||||||
DECLARE_OBJECTNAME(ElasticWriter);
|
DECLARE_OBJECTNAME(ElasticsearchWriter);
|
||||||
|
|
||||||
ElasticWriter(void);
|
ElasticsearchWriter(void);
|
||||||
|
|
||||||
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||||
|
|
||||||
@ -78,4 +78,4 @@ private:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif /* ELASTICWRITER_H */
|
#endif /* ELASTICSEARCHWRITER_H */
|
@ -5,7 +5,7 @@ library perfdata;
|
|||||||
namespace icinga
|
namespace icinga
|
||||||
{
|
{
|
||||||
|
|
||||||
class ElasticWriter : ConfigObject
|
class ElasticsearchWriter : ConfigObject
|
||||||
{
|
{
|
||||||
[config, required] String host {
|
[config, required] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
Loading…
x
Reference in New Issue
Block a user