diff --git a/doc/10-icinga-template-library.md b/doc/10-icinga-template-library.md
index 9311f1552..e835f5761 100644
--- a/doc/10-icinga-template-library.md
+++ b/doc/10-icinga-template-library.md
@@ -103,6 +103,24 @@ cluster\_zone | **Required.** The zone name. Defaults to `$host.name$`.
cluster\_lag\_warning | **Optional.** Warning threshold for log lag in seconds. Applies if the log lag is greater than the threshold.
cluster\_lag\_critical | **Optional.** Critical threshold for log lag in seconds. Applies if the log lag is greater than the threshold.
+### icingadb
+
+Check command for the built-in `icingadb` check.
+
+Custom variables passed as [command parameters](03-monitoring-basics.md#command-passing-parameters):
+
+Name | Description
+-----------------------------------------|-----------------------------
+icingadb\_name | **Required.** The name of the Icinga DB connection object. Defaults to `icingadb`.
+icingadb\_full\_dump\_duration\_warning | **Optional.** Warning threshold for ongoing Redis dump duration. Applies if the value is higher than the threshold. Defaults to 5 minutes.
+icingadb\_full\_dump\_duration\_critical | **Optional.** Critical threshold for ongoing Redis dump duration. Applies if the value is higher than the threshold. Defaults to 10 minutes.
+icingadb\_full\_sync\_duration\_warning | **Optional.** Warning threshold for ongoing database sync duration. Applies if the value is higher than the threshold. Defaults to 5 minutes.
+icingadb\_full\_sync\_duration\_critical | **Optional.** Critical threshold for ongoing database sync duration. Applies if the value is higher than the threshold. Defaults to 10 minutes.
+icingadb\_redis\_backlog\_warning | **Optional.** Warning threshold for Redis write backlog. Applies if the value is higher than the threshold. Defaults to 5 minutes.
+icingadb\_redis\_backlog\_critical | **Optional.** Critical threshold for Redis write backlog. Applies if the value is higher than the threshold. Defaults to 15 minutes.
+icingadb\_database\_backlog\_warning | **Optional.** Warning threshold for database sync backlog. Applies if the value is higher than the threshold. Defaults to 5 minutes.
+icingadb\_database\_backlog\_critical | **Optional.** Critical threshold for database sync backlog. Applies if the value is higher than the threshold. Defaults to 15 minutes.
+
### ido
Check command for the built-in `ido` check.
diff --git a/lib/icingadb/CMakeLists.txt b/lib/icingadb/CMakeLists.txt
index 71a7c67f2..de8e4adae 100644
--- a/lib/icingadb/CMakeLists.txt
+++ b/lib/icingadb/CMakeLists.txt
@@ -2,8 +2,11 @@
mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp)
+mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp)
+
set(icingadb_SOURCES
icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp
+ icingadbchecktask.cpp icingadb-itl.cpp
)
if(ICINGA2_UNITY_BUILD)
diff --git a/lib/icingadb/icingadb-itl.conf b/lib/icingadb/icingadb-itl.conf
new file mode 100644
index 000000000..5f3950e3d
--- /dev/null
+++ b/lib/icingadb/icingadb-itl.conf
@@ -0,0 +1,24 @@
+/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
+
+System.assert(Internal.run_with_activation_context(function() {
+ template CheckCommand "icingadb-check-command" use (checkFunc = Internal.IcingadbCheck) {
+ execute = checkFunc
+ }
+
+ object CheckCommand "icingadb" {
+ import "icingadb-check-command"
+
+ vars.icingadb_name = "icingadb"
+
+ vars.icingadb_full_dump_duration_warning = 5m
+ vars.icingadb_full_dump_duration_critical = 10m
+ vars.icingadb_full_sync_duration_warning = 5m
+ vars.icingadb_full_sync_duration_critical = 10m
+ vars.icingadb_redis_backlog_warning = 5m
+ vars.icingadb_redis_backlog_critical = 15m
+ vars.icingadb_database_backlog_warning = 5m
+ vars.icingadb_database_backlog_critical = 15m
+ }
+}))
+
+Internal.remove("IcingadbCheck")
diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp
index f0ae7b591..60354d7fd 100644
--- a/lib/icingadb/icingadb-objects.cpp
+++ b/lib/icingadb/icingadb-objects.cpp
@@ -171,6 +171,12 @@ void IcingaDB::UpdateAllConfigObjects()
Log(LogInformation, "IcingaDB") << "Starting initial config/status dump";
double startTime = Utility::GetTime();
+ SetOngoingDumpStart(startTime);
+
+ Defer resetOngoingDumpStart ([this]() {
+ SetOngoingDumpStart(0);
+ });
+
// Use a Workqueue to pack objects in parallel
WorkQueue upq(25000, Configuration::Concurrency, LogNotice);
upq.SetName("IcingaDB:ConfigDump");
@@ -230,18 +236,7 @@ void IcingaDB::UpdateAllConfigObjects()
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
}, Prio::Config);
- Array::Ptr kvs = res->Get(1);
- Value* key = nullptr;
- ObjectLock oLock (kvs);
-
- for (auto& kv : kvs) {
- if (key) {
- redisCheckSums.emplace(std::move(*key), std::move(kv));
- key = nullptr;
- } else {
- key = &kv;
- }
- }
+ AddKvsToMap(res->Get(1), redisCheckSums);
cursor = res->Get(0);
} while (cursor != "0");
@@ -413,6 +408,8 @@ void IcingaDB::UpdateAllConfigObjects()
auto ourEnd (ourCheckSums.end());
auto flushSets ([&]() {
+ auto affectedConfig (setObject.size() / 2u);
+
setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum});
setObject.insert(setObject.begin(), {"HMSET", configObject});
@@ -426,10 +423,12 @@ void IcingaDB::UpdateAllConfigObjects()
setChecksum.clear();
setObject.clear();
- rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
+ rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
});
auto flushDels ([&]() {
+ auto affectedConfig (delObject.size());
+
delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum});
delObject.insert(delObject.begin(), {"HDEL", configObject});
@@ -443,7 +442,7 @@ void IcingaDB::UpdateAllConfigObjects()
delChecksum.clear();
delObject.clear();
- rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
+ rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
});
auto setOne ([&]() {
@@ -535,8 +534,14 @@ void IcingaDB::UpdateAllConfigObjects()
m_Rcon->EnqueueCallback([&p](boost::asio::yield_context& yc) { p.set_value(); }, Prio::Config);
p.get_future().wait();
+ auto endTime (Utility::GetTime());
+ auto took (endTime - startTime);
+
+ SetLastdumpTook(took);
+ SetLastdumpEnd(endTime);
+
Log(LogInformation, "IcingaDB")
- << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
+ << "Initial config/status dump finished in " << took << " seconds.";
}
std::vector>> IcingaDB::ChunkObjects(std::vector> objects, size_t chunkSize) {
@@ -1142,7 +1147,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode)
streamadd.emplace_back(IcingaToStreamValue(kv.second));
}
- m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream);
+ m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream, {0, 1});
}
}
@@ -1189,7 +1194,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
- m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
+ m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
}
if (checkable) {
@@ -2354,7 +2359,7 @@ void IcingaDB::ForwardHistoryEntries()
if (m_Rcon && m_Rcon->IsConnected()) {
try {
- m_Rcon->GetResultsOfQueries(haystack, Prio::History);
+ m_Rcon->GetResultsOfQueries(haystack, Prio::History, {0, 0, haystack.size()});
break;
} catch (const std::exception& ex) {
logFailure(ex.what());
diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp
index 525be4b71..f96656efc 100644
--- a/lib/icingadb/icingadb.cpp
+++ b/lib/icingadb/icingadb.cpp
@@ -29,12 +29,10 @@ std::mutex IcingaDB::m_EnvironmentIdInitMutex;
REGISTER_TYPE(IcingaDB);
-REGISTER_STATSFUNCTION(IcingaDB, &IcingaDB::StatsFunc);
-
IcingaDB::IcingaDB()
: m_Rcon(nullptr)
{
- m_Rcon = nullptr;
+ m_RconLocked.store(nullptr);
m_WorkQueue.SetName("IcingaDB");
@@ -42,28 +40,6 @@ IcingaDB::IcingaDB()
m_PrefixConfigCheckSum = "icinga:checksum:";
}
-/**
- * Feature stats interface
- *
- * @param status Key value pairs for feature stats
- */
-void IcingaDB::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
-{
- DictionaryData nodes;
-
- for (auto& icingadb : ConfigType::GetObjectsByType()) {
- auto historyBufferItems (icingadb->m_HistoryBulker.Size());
-
- nodes.emplace_back(icingadb->GetName(), new Dictionary({
- { "history_buffer_items", historyBufferItems }
- }));
-
- perfdata->Add(new PerfdataValue("icingadb_" + icingadb->GetName() + "_history_buffer_items", historyBufferItems));
- }
-
- status->Set("icingadb", new Dictionary(std::move(nodes)));
-}
-
void IcingaDB::Validate(int types, const ValidationUtils& utils)
{
ObjectImpl::Validate(types, utils);
@@ -104,6 +80,7 @@ void IcingaDB::Start(bool runtimeCreated)
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(),
GetEnableTls(), GetInsecureNoverify(), GetCertPath(), GetKeyPath(), GetCaPath(), GetCrlPath(),
GetTlsProtocolmin(), GetCipherList(), GetConnectTimeout(), GetDebugInfo());
+ m_RconLocked.store(m_Rcon);
for (const Type::Ptr& type : GetTypes()) {
auto ctype (dynamic_cast(type.get()));
diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp
index ef4483a7a..c08f36465 100644
--- a/lib/icingadb/icingadb.hpp
+++ b/lib/icingadb/icingadb.hpp
@@ -5,6 +5,7 @@
#include "icingadb/icingadb-ti.hpp"
#include "icingadb/redisconnection.hpp"
+#include "base/atomic.hpp"
#include "base/bulker.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
@@ -37,7 +38,6 @@ public:
IcingaDB();
static void ConfigStaticInitialize();
- static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
void Validate(int types, const ValidationUtils& utils) override;
virtual void Start(bool runtimeCreated) override;
@@ -45,6 +45,27 @@ public:
String GetEnvironmentId() const override;
+ inline RedisConnection::Ptr GetConnection()
+ {
+ return m_RconLocked.load();
+ }
+
+ template
+ static void AddKvsToMap(const Array::Ptr& kvs, T& map)
+ {
+ Value* key = nullptr;
+ ObjectLock oLock (kvs);
+
+ for (auto& kv : kvs) {
+ if (key) {
+ map.emplace(std::move(*key), std::move(kv));
+ key = nullptr;
+ } else {
+ key = &kv;
+ }
+ }
+ }
+
protected:
void ValidateTlsProtocolmin(const Lazy& lvalue, const ValidationUtils& utils) override;
void ValidateConnectTimeout(const Lazy& lvalue, const ValidationUtils& utils) override;
@@ -195,6 +216,10 @@ private:
bool m_ConfigDumpDone;
RedisConnection::Ptr m_Rcon;
+ // m_RconLocked containes a copy of the value in m_Rcon where all accesses are guarded by a mutex to allow safe
+ // concurrent access like from the icingadb check command. It's a copy to still allow fast access without additional
+ // syncronization to m_Rcon within the IcingaDB feature itself.
+ Locked m_RconLocked;
std::unordered_map m_Rcons;
std::atomic_size_t m_PendingRcons;
diff --git a/lib/icingadb/icingadb.ti b/lib/icingadb/icingadb.ti
index 00ca95691..1c649c8e4 100644
--- a/lib/icingadb/icingadb.ti
+++ b/lib/icingadb/icingadb.ti
@@ -48,6 +48,16 @@ class IcingaDB : ConfigObject
[no_storage] String environment_id {
get;
};
+
+ [set_protected] double ongoing_dump_start {
+ default {{{ return 0; }}}
+ };
+ [state, set_protected] double lastdump_end {
+ default {{{ return 0; }}}
+ };
+ [state, set_protected] double lastdump_took {
+ default {{{ return 0; }}}
+ };
};
}
diff --git a/lib/icingadb/icingadbchecktask.cpp b/lib/icingadb/icingadbchecktask.cpp
new file mode 100644
index 000000000..c2f1a3699
--- /dev/null
+++ b/lib/icingadb/icingadbchecktask.cpp
@@ -0,0 +1,514 @@
+/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
+
+#include "icingadb/icingadbchecktask.hpp"
+#include "icinga/host.hpp"
+#include "icinga/checkcommand.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/pluginutility.hpp"
+#include "base/function.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/convert.hpp"
+#include
+
+using namespace icinga;
+
+REGISTER_FUNCTION_NONCONST(Internal, IcingadbCheck, &IcingadbCheckTask::ScriptFunc, "checkable:cr:resolvedMacros:useResolvedMacros");
+
+static void ReportIcingadbCheck(
+ const Checkable::Ptr& checkable, const CheckCommand::Ptr& commandObj,
+ const CheckResult::Ptr& cr, String output, ServiceState state)
+{
+ if (Checkable::ExecuteCommandProcessFinishedHandler) {
+ double now = Utility::GetTime();
+ ProcessResult pr;
+ pr.PID = -1;
+ pr.Output = std::move(output);
+ pr.ExecutionStart = now;
+ pr.ExecutionEnd = now;
+ pr.ExitStatus = state;
+
+ Checkable::ExecuteCommandProcessFinishedHandler(commandObj->GetName(), pr);
+ } else {
+ cr->SetState(state);
+ cr->SetOutput(output);
+ checkable->ProcessCheckResult(cr);
+ }
+}
+
+static inline
+double GetXMessageTs(const Array::Ptr& xMessage)
+{
+ return Convert::ToLong(String(xMessage->Get(0)).Split("-")[0]) / 1000.0;
+}
+
+void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
+ const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros)
+{
+ CheckCommand::Ptr commandObj = CheckCommand::ExecuteOverride ? CheckCommand::ExecuteOverride : checkable->GetCheckCommand();
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ MacroProcessor::ResolverList resolvers;
+ String silenceMissingMacroWarning;
+
+ if (MacroResolver::OverrideMacros)
+ resolvers.emplace_back("override", MacroResolver::OverrideMacros);
+
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+ resolvers.emplace_back("command", commandObj);
+ resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
+
+ auto resolve ([&](const String& macro) {
+ return MacroProcessor::ResolveMacros(macro, resolvers, checkable->GetLastCheckResult(),
+ &silenceMissingMacroWarning, MacroProcessor::EscapeCallback(), resolvedMacros, useResolvedMacros);
+ });
+
+ struct Thresholds
+ {
+ Value Warning, Critical;
+ };
+
+ auto resolveThresholds ([&resolve](const String& wmacro, const String& cmacro) {
+ return Thresholds{resolve(wmacro), resolve(cmacro)};
+ });
+
+ String icingadbName = resolve("$icingadb_name$");
+
+ auto dumpTakesThresholds (resolveThresholds("$icingadb_full_dump_duration_warning$", "$icingadb_full_dump_duration_critical$"));
+ auto syncTakesThresholds (resolveThresholds("$icingadb_full_sync_duration_warning$", "$icingadb_full_sync_duration_critical$"));
+ auto icingaBacklogThresholds (resolveThresholds("$icingadb_redis_backlog_warning$", "$icingadb_redis_backlog_critical$"));
+ auto icingadbBacklogThresholds (resolveThresholds("$icingadb_database_backlog_warning$", "$icingadb_database_backlog_critical$"));
+
+ if (resolvedMacros && !useResolvedMacros)
+ return;
+
+ if (icingadbName.IsEmpty()) {
+ ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Attribute 'icingadb_name' must be set.", ServiceUnknown);
+ return;
+ }
+
+ auto conn (IcingaDB::GetByName(icingadbName));
+
+ if (!conn) {
+ ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Icinga DB connection '" + icingadbName + "' does not exist.", ServiceUnknown);
+ return;
+ }
+
+ auto redis (conn->GetConnection());
+
+ if (!redis || !redis->GetConnected()) {
+ ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB CRITICAL: Not connected to Redis.", ServiceCritical);
+ return;
+ }
+
+ auto now (Utility::GetTime());
+ Array::Ptr redisTime, xReadHeartbeat, xReadStats, xReadRuntimeBacklog, xReadHistoryBacklog;
+
+ try {
+ auto replies (redis->GetResultsOfQueries(
+ {
+ {"TIME"},
+ {"XREAD", "STREAMS", "icingadb:telemetry:heartbeat", "0-0"},
+ {"XREAD", "STREAMS", "icingadb:telemetry:stats", "0-0"},
+ {"XREAD", "COUNT", "1", "STREAMS", "icinga:runtime", "icinga:runtime:state", "0-0", "0-0"},
+ {
+ "XREAD", "COUNT", "1", "STREAMS",
+ "icinga:history:stream:acknowledgement",
+ "icinga:history:stream:comment",
+ "icinga:history:stream:downtime",
+ "icinga:history:stream:flapping",
+ "icinga:history:stream:notification",
+ "icinga:history:stream:state",
+ "0-0", "0-0", "0-0", "0-0", "0-0", "0-0",
+ }
+ },
+ RedisConnection::QueryPriority::Heartbeat
+ ));
+
+ redisTime = std::move(replies.at(0));
+ xReadHeartbeat = std::move(replies.at(1));
+ xReadStats = std::move(replies.at(2));
+ xReadRuntimeBacklog = std::move(replies.at(3));
+ xReadHistoryBacklog = std::move(replies.at(4));
+ } catch (const std::exception& ex) {
+ ReportIcingadbCheck(
+ checkable, commandObj, cr,
+ String("Icinga DB CRITICAL: Could not query Redis: ") + ex.what(), ServiceCritical
+ );
+ return;
+ }
+
+ if (!xReadHeartbeat) {
+ ReportIcingadbCheck(
+ checkable, commandObj, cr,
+ "Icinga DB CRITICAL: The Icinga DB daemon seems to have never run. (Missing heartbeat)",
+ ServiceCritical
+ );
+
+ return;
+ }
+
+ auto redisOldestPending (redis->GetOldestPendingQueryTs());
+ auto ongoingDumpStart (conn->GetOngoingDumpStart());
+ auto dumpWhen (conn->GetLastdumpEnd());
+ auto dumpTook (conn->GetLastdumpTook());
+
+ auto redisNow (Convert::ToLong(redisTime->Get(0)) + Convert::ToLong(redisTime->Get(1)) / 1000000.0);
+ Array::Ptr heartbeatMessage = Array::Ptr(Array::Ptr(xReadHeartbeat->Get(0))->Get(1))->Get(0);
+ auto heartbeatTime (GetXMessageTs(heartbeatMessage));
+ std::map heartbeatData;
+
+ IcingaDB::AddKvsToMap(heartbeatMessage->Get(1), heartbeatData);
+
+ String version = heartbeatData.at("version");
+ auto icingadbNow (Convert::ToLong(heartbeatData.at("time")) / 1000.0 + (redisNow - heartbeatTime));
+ auto icingadbStartTime (Convert::ToLong(heartbeatData.at("start-time")) / 1000.0);
+ String errMsg (heartbeatData.at("error"));
+ auto errSince (Convert::ToLong(heartbeatData.at("error-since")) / 1000.0);
+ String perfdataFromRedis = heartbeatData.at("performance-data");
+ auto heartbeatLastReceived (Convert::ToLong(heartbeatData.at("last-heartbeat-received")) / 1000.0);
+ bool weResponsible = Convert::ToLong(heartbeatData.at("ha-responsible"));
+ auto weResponsibleTs (Convert::ToLong(heartbeatData.at("ha-responsible-ts")) / 1000.0);
+ bool otherResponsible = Convert::ToLong(heartbeatData.at("ha-other-responsible"));
+ auto syncOngoingSince (Convert::ToLong(heartbeatData.at("sync-ongoing-since")) / 1000.0);
+ auto syncSuccessWhen (Convert::ToLong(heartbeatData.at("sync-success-finish")) / 1000.0);
+ auto syncSuccessTook (Convert::ToLong(heartbeatData.at("sync-success-duration")) / 1000.0);
+
+ std::ostringstream i2okmsgs, idbokmsgs, warnmsgs, critmsgs;
+ Array::Ptr perfdata = new Array();
+
+ i2okmsgs << std::fixed << std::setprecision(3);
+ idbokmsgs << std::fixed << std::setprecision(3);
+ warnmsgs << std::fixed << std::setprecision(3);
+ critmsgs << std::fixed << std::setprecision(3);
+
+ const auto downForCritical (10);
+ auto downFor (redisNow - heartbeatTime);
+ bool down = false;
+
+ if (downFor > downForCritical) {
+ down = true;
+
+ critmsgs << " Last seen " << Utility::FormatDuration(downFor)
+ << " ago, greater than CRITICAL threshold (" << Utility::FormatDuration(downForCritical) << ")!";
+ } else {
+ idbokmsgs << "\n* Last seen: " << Utility::FormatDuration(downFor) << " ago";
+ }
+
+ perfdata->Add(new PerfdataValue("icingadb_heartbeat_age", downFor, false, "seconds", Empty, downForCritical, 0));
+
+ const auto errForCritical (10);
+ auto err (!errMsg.IsEmpty());
+ auto errFor (icingadbNow - errSince);
+
+ if (err) {
+ if (errFor > errForCritical) {
+ critmsgs << " ERROR: " << errMsg << "!";
+ }
+
+ perfdata->Add(new PerfdataValue("error_for", errFor * (err ? 1 : -1), false, "seconds", Empty, errForCritical, 0));
+ }
+
+ if (!down) {
+ const auto heartbeatLagWarning (3/* Icinga DB read freq. */ + 1/* Icinga DB write freq. */ + 2/* threshold */);
+ auto heartbeatLag (fmin(icingadbNow - heartbeatLastReceived, 10 * 60));
+
+ if (!heartbeatLastReceived) {
+ critmsgs << " Lost Icinga 2 heartbeat!";
+ } else if (heartbeatLag > heartbeatLagWarning) {
+ warnmsgs << " Icinga 2 heartbeat lag: " << Utility::FormatDuration(heartbeatLag)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(heartbeatLagWarning) << ").";
+ }
+
+ perfdata->Add(new PerfdataValue("icinga2_heartbeat_age", heartbeatLag, false, "seconds", heartbeatLagWarning, Empty, 0));
+ }
+
+ if (weResponsible) {
+ idbokmsgs << "\n* Responsible";
+ } else if (otherResponsible) {
+ idbokmsgs << "\n* Not responsible, but another instance is";
+ } else {
+ critmsgs << " No instance is responsible!";
+ }
+
+ perfdata->Add(new PerfdataValue("icingadb_responsible_instances", int(weResponsible || otherResponsible), false, "", Empty, Empty, 0, 1));
+
+ const auto clockDriftWarning (5);
+ const auto clockDriftCritical (30);
+ auto clockDrift (std::max({
+ fabs(now - redisNow),
+ fabs(redisNow - icingadbNow),
+ fabs(icingadbNow - now),
+ }));
+
+ if (clockDrift > clockDriftCritical) {
+ critmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(clockDriftCritical) << ")!";
+ } else if (clockDrift > clockDriftWarning) {
+ warnmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(clockDriftWarning) << ").";
+ }
+
+ perfdata->Add(new PerfdataValue("clock_drift", clockDrift, false, "seconds", clockDriftWarning, clockDriftCritical, 0));
+
+ if (ongoingDumpStart) {
+ auto ongoingDumpTakes (now - ongoingDumpStart);
+
+ if (!dumpTakesThresholds.Critical.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Critical) {
+ critmsgs << " Current Icinga 2 full dump already takes " << Utility::FormatDuration(ongoingDumpTakes)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(dumpTakesThresholds.Critical) << ")!";
+ } else if (!dumpTakesThresholds.Warning.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Warning) {
+ warnmsgs << " Current Icinga 2 full dump already takes " << Utility::FormatDuration(ongoingDumpTakes)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(dumpTakesThresholds.Warning) << ").";
+ } else {
+ i2okmsgs << "\n* Current full dump running for " << Utility::FormatDuration(ongoingDumpTakes);
+ }
+
+ perfdata->Add(new PerfdataValue("icinga2_current_full_dump_duration", ongoingDumpTakes, false, "seconds",
+ dumpTakesThresholds.Warning, dumpTakesThresholds.Critical, 0));
+ }
+
+ if (!down && syncOngoingSince) {
+ auto ongoingSyncTakes (icingadbNow - syncOngoingSince);
+
+ if (!syncTakesThresholds.Critical.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Critical) {
+ critmsgs << " Current full sync already takes " << Utility::FormatDuration(ongoingSyncTakes)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(syncTakesThresholds.Critical) << ")!";
+ } else if (!syncTakesThresholds.Warning.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Warning) {
+ warnmsgs << " Current full sync already takes " << Utility::FormatDuration(ongoingSyncTakes)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(syncTakesThresholds.Warning) << ").";
+ } else {
+ idbokmsgs << "\n* Current full sync running for " << Utility::FormatDuration(ongoingSyncTakes);
+ }
+
+ perfdata->Add(new PerfdataValue("icingadb_current_full_sync_duration", ongoingSyncTakes, false, "seconds",
+ syncTakesThresholds.Warning, syncTakesThresholds.Critical, 0));
+ }
+
+ auto redisBacklog (now - redisOldestPending);
+
+ if (!redisOldestPending) {
+ redisBacklog = 0;
+ }
+
+ if (!icingaBacklogThresholds.Critical.IsEmpty() && redisBacklog > icingaBacklogThresholds.Critical) {
+ critmsgs << " Icinga 2 Redis query backlog: " << Utility::FormatDuration(redisBacklog)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Critical) << ")!";
+ } else if (!icingaBacklogThresholds.Warning.IsEmpty() && redisBacklog > icingaBacklogThresholds.Warning) {
+ warnmsgs << " Icinga 2 Redis query backlog: " << Utility::FormatDuration(redisBacklog)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Warning) << ").";
+ }
+
+ perfdata->Add(new PerfdataValue("icinga2_redis_query_backlog", redisBacklog, false, "seconds",
+ icingaBacklogThresholds.Warning, icingaBacklogThresholds.Critical, 0));
+
+ if (!down) {
+ auto getBacklog = [redisNow](const Array::Ptr& streams) -> double {
+ if (!streams) {
+ return 0;
+ }
+
+ double minTs = 0;
+ ObjectLock lock (streams);
+
+ for (Array::Ptr stream : streams) {
+ auto ts (GetXMessageTs(Array::Ptr(stream->Get(1))->Get(0)));
+
+ if (minTs == 0 || ts < minTs) {
+ minTs = ts;
+ }
+ }
+
+ if (minTs > 0) {
+ return redisNow - minTs;
+ } else {
+ return 0;
+ }
+ };
+
+ double historyBacklog = getBacklog(xReadHistoryBacklog);
+
+ if (!icingadbBacklogThresholds.Critical.IsEmpty() && historyBacklog > icingadbBacklogThresholds.Critical) {
+ critmsgs << " History backlog: " << Utility::FormatDuration(historyBacklog)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Critical) << ")!";
+ } else if (!icingadbBacklogThresholds.Warning.IsEmpty() && historyBacklog > icingadbBacklogThresholds.Warning) {
+ warnmsgs << " History backlog: " << Utility::FormatDuration(historyBacklog)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Warning) << ").";
+ }
+
+ perfdata->Add(new PerfdataValue("icingadb_history_backlog", historyBacklog, false, "seconds",
+ icingadbBacklogThresholds.Warning, icingadbBacklogThresholds.Critical, 0));
+
+ double runtimeBacklog = 0;
+
+ if (weResponsible && !syncOngoingSince) {
+ // These streams are only processed by the responsible instance after the full sync finished,
+ // it's fine for some backlog to exist otherwise.
+ runtimeBacklog = getBacklog(xReadRuntimeBacklog);
+
+ if (!icingadbBacklogThresholds.Critical.IsEmpty() && runtimeBacklog > icingadbBacklogThresholds.Critical) {
+ critmsgs << " Runtime update backlog: " << Utility::FormatDuration(runtimeBacklog)
+ << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Critical) << ")!";
+ } else if (!icingadbBacklogThresholds.Warning.IsEmpty() && runtimeBacklog > icingadbBacklogThresholds.Warning) {
+ warnmsgs << " Runtime update backlog: " << Utility::FormatDuration(runtimeBacklog)
+ << ", greater than WARNING threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Warning) << ").";
+ }
+ }
+
+ // Also report the perfdata value on the standby instance or during a full sync (as 0 in this case).
+ perfdata->Add(new PerfdataValue("icingadb_runtime_update_backlog", runtimeBacklog, false, "seconds",
+ icingadbBacklogThresholds.Warning, icingadbBacklogThresholds.Critical, 0));
+ }
+
+ auto dumpAgo (now - dumpWhen);
+
+ if (dumpWhen) {
+ perfdata->Add(new PerfdataValue("icinga2_last_full_dump_ago", dumpAgo, false, "seconds", Empty, Empty, 0));
+ }
+
+ if (dumpTook) {
+ perfdata->Add(new PerfdataValue("icinga2_last_full_dump_duration", dumpTook, false, "seconds", Empty, Empty, 0));
+ }
+
+ if (dumpWhen && dumpTook) {
+ i2okmsgs << "\n* Last full dump: " << Utility::FormatDuration(dumpAgo)
+ << " ago, took " << Utility::FormatDuration(dumpTook);
+ }
+
+ auto icingadbUptime (icingadbNow - icingadbStartTime);
+
+ if (!down) {
+ perfdata->Add(new PerfdataValue("icingadb_uptime", icingadbUptime, false, "seconds", Empty, Empty, 0));
+ }
+
+ {
+ Array::Ptr values = PluginUtility::SplitPerfdata(perfdataFromRedis);
+ ObjectLock lock (values);
+
+ for (auto& v : values) {
+ perfdata->Add(PerfdataValue::Parse(v));
+ }
+ }
+
+ if (weResponsibleTs) {
+ perfdata->Add(new PerfdataValue("icingadb_responsible_for",
+ (weResponsible ? 1 : -1) * (icingadbNow - weResponsibleTs), false, "seconds"));
+ }
+
+ auto syncAgo (icingadbNow - syncSuccessWhen);
+
+ if (syncSuccessWhen) {
+ perfdata->Add(new PerfdataValue("icingadb_last_full_sync_ago", syncAgo, false, "seconds", Empty, Empty, 0));
+ }
+
+ if (syncSuccessTook) {
+ perfdata->Add(new PerfdataValue("icingadb_last_full_sync_duration", syncSuccessTook, false, "seconds", Empty, Empty, 0));
+ }
+
+ if (syncSuccessWhen && syncSuccessTook) {
+ idbokmsgs << "\n* Last full sync: " << Utility::FormatDuration(syncAgo)
+ << " ago, took " << Utility::FormatDuration(syncSuccessTook);
+ }
+
+ std::map statsPerOp;
+
+ const char * const icingadbKnownStats[] = {
+ "config_sync", "state_sync", "history_sync", "overdue_sync", "history_cleanup"
+ };
+
+ for (auto metric : icingadbKnownStats) {
+ statsPerOp.emplace(std::piecewise_construct, std::forward_as_tuple(metric), std::forward_as_tuple(15 * 60));
+ }
+
+ if (xReadStats) {
+ Array::Ptr messages = Array::Ptr(xReadStats->Get(0))->Get(1);
+ ObjectLock lock (messages);
+
+ for (Array::Ptr message : messages) {
+ auto ts (GetXMessageTs(message));
+ std::map opsPerSec;
+
+ IcingaDB::AddKvsToMap(message->Get(1), opsPerSec);
+
+ for (auto& kv : opsPerSec) {
+ auto buf (statsPerOp.find(kv.first));
+
+ if (buf == statsPerOp.end()) {
+ buf = statsPerOp.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(kv.first), std::forward_as_tuple(15 * 60)
+ ).first;
+ }
+
+ buf->second.InsertValue(ts, Convert::ToLong(kv.second));
+ }
+ }
+ }
+
+ for (auto& kv : statsPerOp) {
+ perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_1min", kv.second.UpdateAndGetValues(now, 60), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_5mins", kv.second.UpdateAndGetValues(now, 5 * 60), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_15mins", kv.second.UpdateAndGetValues(now, 15 * 60), false, "", Empty, Empty, 0));
+ }
+
+ perfdata->Add(new PerfdataValue("icinga2_redis_queries_1min", redis->GetQueryCount(60), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue("icinga2_redis_queries_5mins", redis->GetQueryCount(5 * 60), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue("icinga2_redis_queries_15mins", redis->GetQueryCount(15 * 60), false, "", Empty, Empty, 0));
+
+ perfdata->Add(new PerfdataValue("icinga2_redis_pending_queries", redis->GetPendingQueryCount(), false, "", Empty, Empty, 0));
+
+ struct {
+ const char * Name;
+ int (RedisConnection::* Getter)(RingBuffer::SizeType span, RingBuffer::SizeType tv);
+ } const icingaWriteSubjects[] = {
+ {"config_dump", &RedisConnection::GetWrittenConfigFor},
+ {"state_dump", &RedisConnection::GetWrittenStateFor},
+ {"history_dump", &RedisConnection::GetWrittenHistoryFor}
+ };
+
+ for (auto subject : icingaWriteSubjects) {
+ perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_1min", (redis.get()->*subject.Getter)(60, now), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_5mins", (redis.get()->*subject.Getter)(5 * 60, now), false, "", Empty, Empty, 0));
+ perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_15mins", (redis.get()->*subject.Getter)(15 * 60, now), false, "", Empty, Empty, 0));
+ }
+
+ ServiceState state;
+ std::ostringstream msgbuf;
+ auto i2okmsg (i2okmsgs.str());
+ auto idbokmsg (idbokmsgs.str());
+ auto warnmsg (warnmsgs.str());
+ auto critmsg (critmsgs.str());
+
+ msgbuf << "Icinga DB ";
+
+ if (!critmsg.empty()) {
+ state = ServiceCritical;
+ msgbuf << "CRITICAL:" << critmsg;
+
+ if (!warnmsg.empty()) {
+ msgbuf << "\n\nWARNING:" << warnmsg;
+ }
+ } else if (!warnmsg.empty()) {
+ state = ServiceWarning;
+ msgbuf << "WARNING:" << warnmsg;
+ } else {
+ state = ServiceOK;
+ msgbuf << "OK: Uptime: " << Utility::FormatDuration(icingadbUptime) << ". Version: " << version << ".";
+ }
+
+ if (!i2okmsg.empty()) {
+ msgbuf << "\n\nIcinga 2:\n" << i2okmsg;
+ }
+
+ if (!idbokmsg.empty()) {
+ msgbuf << "\n\nIcinga DB:\n" << idbokmsg;
+ }
+
+ cr->SetPerformanceData(perfdata);
+ ReportIcingadbCheck(checkable, commandObj, cr, msgbuf.str(), state);
+}
diff --git a/lib/icingadb/icingadbchecktask.hpp b/lib/icingadb/icingadbchecktask.hpp
new file mode 100644
index 000000000..ba7d61b1e
--- /dev/null
+++ b/lib/icingadb/icingadbchecktask.hpp
@@ -0,0 +1,29 @@
+/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
+
+#ifndef ICINGADBCHECKTASK_H
+#define ICINGADBCHECKTASK_H
+
+#include "icingadb/icingadb.hpp"
+#include "icinga/checkable.hpp"
+
+namespace icinga
+{
+
+/**
+ * Icinga DB check.
+ *
+ * @ingroup icingadb
+ */
+class IcingadbCheckTask
+{
+public:
+ static void ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
+ const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros);
+
+private:
+ IcingadbCheckTask();
+};
+
+}
+
+#endif /* ICINGADBCHECKTASK_H */
diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp
index f2ae3fa7b..d9233ca77 100644
--- a/lib/icingadb/redisconnection.cpp
+++ b/lib/icingadb/redisconnection.cpp
@@ -113,7 +113,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
* @param query Redis query
* @param priority The query's priority
*/
-void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
+void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
{
{
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
@@ -121,9 +121,10 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
}
auto item (Shared::Make(std::move(query)));
+ auto ctime (Utility::GetTime());
- asio::post(m_Strand, [this, item, priority]() {
- m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects});
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
@@ -135,7 +136,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
* @param queries Redis queries
* @param priority The queries' priority
*/
-void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
+void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
{
for (auto& query : queries) {
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
@@ -143,9 +144,10 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
}
auto item (Shared::Make(std::move(queries)));
+ auto ctime (Utility::GetTime());
- asio::post(m_Strand, [this, item, priority]() {
- m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
m_QueuedWrites.Set();
IncreasePendingQueries(item->size());
});
@@ -159,7 +161,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
*
* @return The response
*/
-RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
+RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
{
{
Log msg (LogDebug, "IcingaDB", "Executing query:");
@@ -169,9 +171,10 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
std::promise promise;
auto future (promise.get_future());
auto item (Shared>>::Make(std::move(query), std::move(promise)));
+ auto ctime (Utility::GetTime());
- asio::post(m_Strand, [this, item, priority]() {
- m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
@@ -189,7 +192,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
*
* @return The responses
*/
-RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
+RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
{
for (auto& query : queries) {
Log msg (LogDebug, "IcingaDB", "Executing query:");
@@ -199,9 +202,10 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
std::promise promise;
auto future (promise.get_future());
auto item (Shared>>::Make(std::move(queries), std::move(promise)));
+ auto ctime (Utility::GetTime());
- asio::post(m_Strand, [this, item, priority]() {
- m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects});
m_QueuedWrites.Set();
IncreasePendingQueries(item->first.size());
});
@@ -213,8 +217,10 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
void RedisConnection::EnqueueCallback(const std::function& callback, RedisConnection::QueryPriority priority)
{
- asio::post(m_Strand, [this, callback, priority]() {
- m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback});
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, callback, priority, ctime]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
m_QueuedWrites.Set();
});
}
@@ -230,6 +236,36 @@ void RedisConnection::Sync()
GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
}
+/**
+ * Get the enqueue time of the oldest still queued Redis query
+ *
+ * @return *nix timestamp or 0
+ */
+double RedisConnection::GetOldestPendingQueryTs()
+{
+ auto promise (Shared>::Make());
+ auto future (promise->get_future());
+
+ asio::post(m_Strand, [this, promise]() {
+ double oldest = 0;
+
+ for (auto& queue : m_Queues.Writes) {
+ if (m_SuppressedQueryKinds.find(queue.first) == m_SuppressedQueryKinds.end() && !queue.second.empty()) {
+ auto ctime (queue.second.front().CTime);
+
+ if (ctime < oldest || oldest == 0) {
+ oldest = ctime;
+ }
+ }
+ }
+
+ promise->set_value(oldest);
+ });
+
+ future.wait();
+ return future.get();
+}
+
/**
* Mark kind as kind of queries not to actually send yet
*
@@ -625,6 +661,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
if (next.Callback) {
next.Callback(yc);
}
+
+ RecordAffected(next.Affects, Utility::GetTime());
}
/**
@@ -674,6 +712,11 @@ void RedisConnection::SetConnectedCallback(std::functionm_Strand, [parent, affected, when]() {
+ parent->RecordAffected(affected, when);
+ });
+ } else {
+ if (affected.Config) {
+ m_WrittenConfig.InsertValue(when, affected.Config);
+ }
+
+ if (affected.State) {
+ m_WrittenState.InsertValue(when, affected.State);
+ }
+
+ if (affected.History) {
+ m_WrittenHistory.InsertValue(when, affected.History);
+ }
+ }
+}
diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp
index fb9d1e584..f346ba285 100644
--- a/lib/icingadb/redisconnection.hpp
+++ b/lib/icingadb/redisconnection.hpp
@@ -74,6 +74,16 @@ namespace icinga
SyncConnection = 255
};
+ struct QueryAffects
+ {
+ size_t Config;
+ size_t State;
+ size_t History;
+
+ QueryAffects(size_t config = 0, size_t state = 0, size_t history = 0)
+ : Config(config), State(state), History(history) { }
+ };
+
RedisConnection(const String& host, int port, const String& path, const String& password, int db,
bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr);
@@ -84,20 +94,48 @@ namespace icinga
bool IsConnected();
- void FireAndForgetQuery(Query query, QueryPriority priority);
- void FireAndForgetQueries(Queries queries, QueryPriority priority);
+ void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {});
+ void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
- Reply GetResultOfQuery(Query query, QueryPriority priority);
- Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
+ Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {});
+ Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
void EnqueueCallback(const std::function& callback, QueryPriority priority);
void Sync();
+ double GetOldestPendingQueryTs();
void SuppressQueryKind(QueryPriority kind);
void UnsuppressQueryKind(QueryPriority kind);
void SetConnectedCallback(std::function callback);
+ inline bool GetConnected()
+ {
+ return m_Connected.load();
+ }
+
+ int GetQueryCount(RingBuffer::SizeType span);
+
+ inline int GetPendingQueryCount()
+ {
+ return m_PendingQueries;
+ }
+
+ inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
+ {
+ return m_WrittenConfig.UpdateAndGetValues(tv, span);
+ }
+
+ inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
+ {
+ return m_WrittenState.UpdateAndGetValues(tv, span);
+ }
+
+ inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
+ {
+ return m_WrittenHistory.UpdateAndGetValues(tv, span);
+ }
+
private:
/**
* What to do with the responses to Redis queries.
@@ -134,6 +172,9 @@ namespace icinga
Shared>>::Ptr GetResultOfQuery;
Shared>>::Ptr GetResultsOfQueries;
std::function Callback;
+
+ double CTime;
+ QueryAffects Affects;
};
typedef boost::asio::ip::tcp Tcp;
@@ -175,6 +216,7 @@ namespace icinga
void IncreasePendingQueries(int count);
void DecreasePendingQueries(int count);
+ void RecordAffected(QueryAffects affected, double when);
template
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
@@ -225,7 +267,10 @@ namespace icinga
// Stats
RingBuffer m_InputQueries{10};
- RingBuffer m_OutputQueries{10};
+ RingBuffer m_OutputQueries{15 * 60};
+ RingBuffer m_WrittenConfig{15 * 60};
+ RingBuffer m_WrittenState{15 * 60};
+ RingBuffer m_WrittenHistory{15 * 60};
int m_PendingQueries{0};
boost::asio::deadline_timer m_LogStatsTimer;
Ptr m_Parent;
@@ -555,12 +600,12 @@ Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_cont
throw BadRedisInt(std::move(buf));
}
- Array::Ptr arr = new Array();
-
if (i < 0) {
- i = 0;
+ return Empty;
}
+ Array::Ptr arr = new Array();
+
arr->Reserve(i);
for (; i; --i) {