Merge pull request #6725 from Icinga/feature/ha-features

Enable HA for features: Elasticsearch, Gelf, Graphite, InfluxDB, OpenTSDB, Perfdata
This commit is contained in:
Michael Friedrich 2018-10-24 16:10:00 +02:00 committed by GitHub
commit 0aee27d95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 290 additions and 62 deletions

View File

@ -2434,6 +2434,12 @@ By default, the following features provide advanced HA functionality:
* [Checks](06-distributed-monitoring.md#distributed-monitoring-high-availability-checks) (load balanced, automated failover).
* [Notifications](06-distributed-monitoring.md#distributed-monitoring-high-availability-notifications) (load balanced, automated failover).
* [DB IDO](06-distributed-monitoring.md#distributed-monitoring-high-availability-db-ido) (Run-Once, automated failover).
* [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter)
* [Gelf](09-object-types.md#objecttype-gelfwriter)
* [Graphite](09-object-types.md#objecttype-graphitewriter)
* [InfluxDB](09-object-types.md#objecttype-influxdbwriter)
* [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter)
* [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP)
#### High-Availability with Checks <a id="distributed-monitoring-high-availability-checks"></a>

View File

@ -517,6 +517,7 @@ Configuration Attributes:
ca\_path | String | **Optional.** Path to CA certificate to validate the remote host. Requires `enable_tls` set to `true`.
cert\_path | String | **Optional.** Path to host certificate to present to the remote host for mutual verification. Requires `enable_tls` set to `true`.
key\_path | String | **Optional.** Path to host key to accompany the cert\_path. Requires `enable_tls` set to `true`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
Note: If `flush_threshold` is set too low, this will force the feature to flush all data to Elasticsearch too often.
Experiment with the setting, if you are processing more than 1024 metrics per second or similar.
@ -655,6 +656,7 @@ Configuration Attributes:
port | Number | **Optional.** GELF receiver port. Defaults to `12201`.
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
## GraphiteWriter <a id="objecttype-graphitewriter"></a>
@ -682,6 +684,7 @@ Configuration Attributes:
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
enable\_send\_metadata | Boolean | **Optional.** Send additional metadata metrics. Defaults to `false`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
Additional usage examples can be found [here](14-features.md#graphite-carbon-cache-writer).
@ -868,7 +871,7 @@ Configuration Attributes:
table\_prefix | String | **Optional.** MySQL database table prefix. Defaults to `icinga_`.
instance\_name | String | **Optional.** Unique identifier for the local Icinga 2 instance. Defaults to `default`.
instance\_description | String | **Optional.** Description for the Icinga 2 instance.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-db-ido). Defaults to "true".
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-db-ido). Defaults to `true`.
failover\_timeout | Duration | **Optional.** Set the failover timeout in a [HA cluster](06-distributed-monitoring.md#distributed-monitoring-high-availability-db-ido). Must not be lower than 60s. Defaults to `60s`.
cleanup | Dictionary | **Optional.** Dictionary with items for historical table cleanup.
categories | Array | **Optional.** Array of information types that should be written to the database.
@ -1057,6 +1060,7 @@ Configuration Attributes:
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
@ -1303,7 +1307,7 @@ Example:
object OpenTsdbWriter "opentsdb" {
host = "127.0.0.1"
port = 4242
}
```
Configuration Attributes:
@ -1312,6 +1316,7 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
## PerfdataWriter <a id="objecttype-perfdatawriter"></a>
@ -1346,6 +1351,7 @@ Configuration Attributes:
host\_format\_template | String | **Optional.** Host Format template for the performance data file. Defaults to a template that's suitable for use with PNP4Nagios.
service\_format\_template | String | **Optional.** Service Format template for the performance data file. Defaults to a template that's suitable for use with PNP4Nagios.
rotation\_interval | Duration | **Optional.** Rotation interval for the files specified in `{host,service}_perfdata_path`. Defaults to `30s`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `true`.
When rotating the performance data file the current UNIX timestamp is appended to the path specified
in `host_perfdata_path` and `service_perfdata_path` to generate a unique filename.

View File

@ -7,6 +7,26 @@ Specific version upgrades are described below. Please note that version
updates are incremental. An upgrade from v2.6 to v2.8 requires to
follow the instructions for v2.7 too.
## Upgrading to v2.11 <a id="upgrading-to-2-11"></a>
### HA-aware Features <a id="upgrading-to-2-11-ha-aware-features"></a>
v2.11 introduces additional HA functionality similar to the DB IDO feature.
This enables the feature being active only on one endpoint while the other
endpoint is paused. When one endpoint is shut down, automatic failover happens.
This feature is turned on by default. If you need one of the features twice,
please use `enable_ha = false` to restore the old behaviour.
This affects the following features:
* [Elasticsearch](09-object-types.md#objecttype-elasticsearchwriter)
* [Gelf](09-object-types.md#objecttype-gelfwriter)
* [Graphite](09-object-types.md#objecttype-graphitewriter)
* [InfluxDB](09-object-types.md#objecttype-influxdbwriter)
* [OpenTsdb](09-object-types.md#objecttype-opentsdbwriter)
* [Perfdata](09-object-types.md#objecttype-perfdatawriter) (for PNP)
## Upgrading to v2.10 <a id="upgrading-to-2-10"></a>
### Path Constant Changes <a id="upgrading-to-2-10-path-constant-changes"></a>

View File

@ -190,10 +190,12 @@ The GraphiteWriter feature calls the registered function and processes
the received data. Features which connect Icinga 2 to external interfaces
normally parse and reformat the received data into an applicable format.
Since this check result signal is blocking, many of the features include a work queue
with asynchronous task handling.
The GraphiteWriter uses a TCP socket to communicate with the carbon cache
daemon of Graphite. The InfluxDBWriter is instead writing bulk metric messages
to InfluxDB's HTTP API.
to InfluxDB's HTTP API, similar to Elasticsearch.
## Cluster <a id="technical-concepts-cluster"></a>
@ -322,10 +324,10 @@ The update procedure works the same way as above.
### High Availability <a id="technical-concepts-cluster-ha"></a>
High availability is automatically enabled between two nodes in the same
General high availability is automatically enabled between two endpoints in the same
cluster zone.
This requires the same configuration and enabled features on both nodes.
**This requires the same configuration and enabled features on both nodes.**
HA zone members trust each other and share event updates as cluster messages.
This includes for example check results, next check timestamp updates, acknowledgements
@ -334,18 +336,52 @@ or notifications.
This ensures that both nodes are synchronized. If one node goes away, the
remaining node takes over and continues as normal.
#### High Availability: Object Authority <a id="technical-concepts-cluster-ha-object-authority"></a>
Cluster nodes automatically determine the authority for configuration
objects. This results in activated but paused objects. You can verify
objects. By default, all config objects are set to `HARunEverywhere` and
as such the object authority is true for any config object on any instance.
Specific objects can override and influence this setting, e.g. with `HARunOnce`
instead prior to config object activation.
This is done when the daemon starts and in a regular interval inside
the ApiListener class, specifically calling `ApiListener::UpdateObjectAuthority()`.
The algorithm works like this:
* Determine whether this instance is assigned to a local zone and endpoint.
* Collects all endpoints in this zone if they are connected.
* If there's two endpoints, but only us seeing ourselves and the application start is less than 60 seconds in the past, do nothing (wait for cluster reconnect to take place, grace period).
* Sort the collected endpoints by name.
* Iterate over all config types and their respective objects
* Ignore !active objects
* Ignore objects which are !HARunOnce. This means, they can run multiple times in a zone and don't need an authority update.
* If this instance doesn't have a local zone, set authority to true. This is for non-clustered standalone environments where everything belongs to this instance.
* Calculate the object authority based on the connected endpoint names.
* Set the authority (true or false)
The object authority calculation works "offline" without any message exchange.
Each instance alculates the SDBM hash of the config object name, puts that in contrast
modulo the connected endpoints size.
This index is used to lookup the corresponding endpoint in the connected endpoints array,
including the local endpoint. Whether the local endpoint is equal to the selected endpoint,
or not, this sets the authority to `true` or `false`.
```
authority = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()] == my_endpoint;
```
`ConfigObject::SetAuthority(bool authority)` triggers the following events:
* Authority is true and object now paused: Resume the object and set `paused` to `false`.
* Authority is false, object not paused: Pause the object and set `paused` to true.
**This results in activated but paused objects on one endpoint.** You can verify
that by querying the `paused` attribute for all objects via REST API
or debug console.
Nodes inside a HA zone calculate the object authority independent from each other.
The number of endpoints in a zone is defined through the configuration. This number
is used inside a local modulo calculation to determine whether the node feels
responsible for this object or not.
or debug console on both endpoints.
Endpoints inside a HA zone calculate the object authority independent from each other.
This object authority is important for selected features explained below.
Since features are configuration objects too, you must ensure that all nodes
@ -354,6 +390,36 @@ one might have a checker feature on the left node, nothing on the right node.
This leads to late check results because one half is not executed by the right
node which holds half of the object authorities.
By default, features are enabled to "Run-Everywhere". Specific features which
support HA awareness, provide the `enable_ha` configuration attribute. When `enable_ha`
is set to `true` (usually the default), "Run-Once" is set and the feature pauses on one side.
```
vim /etc/icinga2/features-enabled/graphite.conf
object GraphiteWriter "graphite" {
...
enable_ha = true
}
```
Once such a feature is paused, there won't be any more event handling, e.g. the Elasticsearch
feature won't process any checkresults nor write to the Elasticsearch REST API.
When the cluster connection drops, the feature configuration object is updated with
the new object authority by the ApiListener timer and resumes its operation. You can see
that by grepping the log file for `resumed` and `paused`.
```
[2018-10-24 13:28:28 +0200] information/GraphiteWriter: 'g-ha' paused.
```
```
[2018-10-24 13:28:28 +0200] information/GraphiteWriter: 'g-ha' resumed.
```
Specific features with HA capabilities are explained below.
### High Availability: Checker <a id="technical-concepts-cluster-ha-checker"></a>
The `checker` feature only executes checks for `Checkable` objects (Host, Service)

View File

@ -49,6 +49,15 @@ void ElasticsearchWriter::OnConfigLoaded()
ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "ElasticsearchWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -71,14 +80,14 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
}
void ElasticsearchWriter::Start(bool runtimeCreated)
void ElasticsearchWriter::Resume()
{
ObjectImpl<ElasticsearchWriter>::Start(runtimeCreated);
ObjectImpl<ElasticsearchWriter>::Resume();
m_EventPrefix = "icinga2.event.";
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
@ -95,14 +104,14 @@ void ElasticsearchWriter::Start(bool runtimeCreated)
Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
}
void ElasticsearchWriter::Stop(bool runtimeRemoved)
void ElasticsearchWriter::Pause()
{
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
m_WorkQueue.Join();
ObjectImpl<ElasticsearchWriter>::Stop(runtimeRemoved);
ObjectImpl<ElasticsearchWriter>::Pause();
}
void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -176,6 +185,9 @@ void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Ch
void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
}
@ -230,6 +242,9 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}
@ -279,6 +294,9 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
notification, checkable, users, type, cr, author, text));
}

View File

@ -41,8 +41,8 @@ public:
protected:
void OnConfigLoaded() override;
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void Resume() override;
void Pause() override;
private:
String m_EventPrefix;

View File

@ -37,6 +37,9 @@ class ElasticsearchWriter : ConfigObject
[config] int flush_threshold {
default {{{ return 1024; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
}

View File

@ -51,6 +51,15 @@ void GelfWriter::OnConfigLoaded()
ObjectImpl<GelfWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GelfWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "GelfWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -75,12 +84,12 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
status->Set("gelfwriter", new Dictionary(std::move(nodes)));
}
void GelfWriter::Start(bool runtimeCreated)
void GelfWriter::Resume()
{
ObjectImpl<GelfWriter>::Start(runtimeCreated);
ObjectImpl<GelfWriter>::Resume();
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
@ -98,14 +107,14 @@ void GelfWriter::Start(bool runtimeCreated)
Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
}
void GelfWriter::Stop(bool runtimeRemoved)
void GelfWriter::Pause()
{
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
m_WorkQueue.Join();
ObjectImpl<GelfWriter>::Stop(runtimeRemoved);
ObjectImpl<GelfWriter>::Pause();
}
void GelfWriter::AssertOnWorkQueue()
@ -131,6 +140,11 @@ void GelfWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
@ -180,6 +194,9 @@ void GelfWriter::Disconnect()
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
}
@ -284,6 +301,9 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification
const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
const String& author, const String& commentText, const String& commandName)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
notification, checkable, user, notificationType, cr, author, commentText, commandName));
}
@ -348,6 +368,9 @@ void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& noti
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}

View File

@ -46,8 +46,8 @@ public:
protected:
void OnConfigLoaded() override;
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void Resume() override;
void Pause() override;
private:
Stream::Ptr m_Stream;

View File

@ -45,6 +45,9 @@ class GelfWriter : ConfigObject
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
}

View File

@ -49,6 +49,15 @@ void GraphiteWriter::OnConfigLoaded()
ObjectImpl<GraphiteWriter>::OnConfigLoaded();
m_WorkQueue.SetName("GraphiteWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "GraphiteWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -72,12 +81,12 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
status->Set("graphitewriter", new Dictionary(std::move(nodes)));
}
void GraphiteWriter::Start(bool runtimeCreated)
void GraphiteWriter::Resume()
{
ObjectImpl<GraphiteWriter>::Start(runtimeCreated);
ObjectImpl<GraphiteWriter>::Resume();
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(std::bind(&GraphiteWriter::ExceptionHandler, this, _1));
@ -93,14 +102,14 @@ void GraphiteWriter::Start(bool runtimeCreated)
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
}
void GraphiteWriter::Stop(bool runtimeRemoved)
void GraphiteWriter::Pause()
{
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
m_WorkQueue.Join();
ObjectImpl<GraphiteWriter>::Stop(runtimeRemoved);
ObjectImpl<GraphiteWriter>::Pause();
}
void GraphiteWriter::AssertOnWorkQueue()
@ -126,6 +135,11 @@ void GraphiteWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
@ -175,6 +189,9 @@ void GraphiteWriter::Disconnect()
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
}

View File

@ -49,8 +49,8 @@ public:
protected:
void OnConfigLoaded() override;
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void Resume() override;
void Pause() override;
private:
Stream::Ptr m_Stream;

View File

@ -47,6 +47,9 @@ class GraphiteWriter : ConfigObject
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
}

View File

@ -75,6 +75,15 @@ void InfluxdbWriter::OnConfigLoaded()
ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "InfluxdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -100,12 +109,12 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
status->Set("influxdbwriter", new Dictionary(std::move(nodes)));
}
void InfluxdbWriter::Start(bool runtimeCreated)
void InfluxdbWriter::Resume()
{
ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
ObjectImpl<InfluxdbWriter>::Resume();
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback(std::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
@ -121,14 +130,14 @@ void InfluxdbWriter::Start(bool runtimeCreated)
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}
void InfluxdbWriter::Stop(bool runtimeRemoved)
void InfluxdbWriter::Pause()
{
Log(LogInformation, "InfluxdbWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
m_WorkQueue.Join();
ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
ObjectImpl<InfluxdbWriter>::Pause();
}
void InfluxdbWriter::AssertOnWorkQueue()
@ -188,6 +197,9 @@ Stream::Ptr InfluxdbWriter::Connect()
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow);
}

View File

@ -49,8 +49,8 @@ public:
protected:
void OnConfigLoaded() override;
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void Resume() override;
void Pause() override;
private:
WorkQueue m_WorkQueue{10000000, 1};

View File

@ -88,6 +88,9 @@ class InfluxdbWriter : ConfigObject
[config] int flush_threshold {
default {{{ return 1024; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
validator InfluxdbWriter {

View File

@ -44,6 +44,20 @@ REGISTER_TYPE(OpenTsdbWriter);
REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
if (!GetEnableHa()) {
Log(LogDebug, "OpenTsdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
DictionaryData nodes;
@ -55,12 +69,12 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
}
void OpenTsdbWriter::Start(bool runtimeCreated)
void OpenTsdbWriter::Resume()
{
ObjectImpl<OpenTsdbWriter>::Start(runtimeCreated);
ObjectImpl<OpenTsdbWriter>::Resume();
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(10);
@ -71,16 +85,19 @@ void OpenTsdbWriter::Start(bool runtimeCreated)
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
}
void OpenTsdbWriter::Stop(bool runtimeRemoved)
void OpenTsdbWriter::Pause()
{
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
ObjectImpl<OpenTsdbWriter>::Stop(runtimeRemoved);
ObjectImpl<OpenTsdbWriter>::Pause();
}
void OpenTsdbWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
if (m_Stream)
return;
@ -102,6 +119,9 @@ void OpenTsdbWriter::ReconnectTimerHandler()
void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())

View File

@ -44,8 +44,9 @@ public:
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
protected:
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void OnConfigLoaded() override;
void Resume() override;
void Pause() override;
private:
Stream::Ptr m_Stream;

View File

@ -34,6 +34,9 @@ class OpenTsdbWriter : ConfigObject
[config] String port {
default {{{ return "4242"; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
}

View File

@ -38,6 +38,20 @@ REGISTER_TYPE(PerfdataWriter);
REGISTER_STATSFUNCTION(PerfdataWriter, &PerfdataWriter::StatsFunc);
void PerfdataWriter::OnConfigLoaded()
{
ObjectImpl<PerfdataWriter>::OnConfigLoaded();
if (!GetEnableHa()) {
Log(LogDebug, "PerfdataWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void PerfdataWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
DictionaryData nodes;
@ -49,12 +63,12 @@ void PerfdataWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
status->Set("perfdatawriter", new Dictionary(std::move(nodes)));
}
void PerfdataWriter::Start(bool runtimeCreated)
void PerfdataWriter::Resume()
{
ObjectImpl<PerfdataWriter>::Start(runtimeCreated);
ObjectImpl<PerfdataWriter>::Resume();
Log(LogInformation, "PerfdataWriter")
<< "'" << GetName() << "' started.";
<< "'" << GetName() << "' resumed.";
Checkable::OnNewCheckResult.connect(std::bind(&PerfdataWriter::CheckResultHandler, this, _1, _2));
@ -67,12 +81,12 @@ void PerfdataWriter::Start(bool runtimeCreated)
RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
}
void PerfdataWriter::Stop(bool runtimeRemoved)
void PerfdataWriter::Pause()
{
Log(LogInformation, "PerfdataWriter")
<< "'" << GetName() << "' stopped.";
<< "'" << GetName() << "' paused.";
ObjectImpl<PerfdataWriter>::Stop(runtimeRemoved);
ObjectImpl<PerfdataWriter>::Pause();
}
Value PerfdataWriter::EscapeMacroMetric(const Value& value)
@ -85,6 +99,9 @@ Value PerfdataWriter::EscapeMacroMetric(const Value& value)
void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;
CONTEXT("Writing performance data for object '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
@ -154,6 +171,9 @@ void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path,
void PerfdataWriter::RotationTimerHandler()
{
if (IsPaused())
return;
RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath());
RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
}

View File

@ -46,8 +46,9 @@ public:
void ValidateServiceFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override;
protected:
void Start(bool runtimeCreated) override;
void Stop(bool runtimeRemoved) override;
void OnConfigLoaded() override;
void Resume() override;
void Pause() override;
private:
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

View File

@ -70,6 +70,9 @@ class PerfdataWriter : ConfigObject
[config] double rotation_interval {
default {{{ return 30; }}}
};
[config] bool enable_ha {
default {{{ return true; }}}
};
};
}