mirror of https://github.com/Icinga/icinga2.git
Introduce Icinga DB check (like the IDO one)
This commit is contained in:
parent
88c8d29ee6
commit
e4a36bc217
|
@ -103,6 +103,24 @@ cluster\_zone | **Required.** The zone name. Defaults to `$host.name$`.
|
|||
cluster\_lag\_warning | **Optional.** Warning threshold for log lag in seconds. Applies if the log lag is greater than the threshold.
|
||||
cluster\_lag\_critical | **Optional.** Critical threshold for log lag in seconds. Applies if the log lag is greater than the threshold.
|
||||
|
||||
### icingadb <a id="itl-icinga-icingadb"></a>
|
||||
|
||||
Check command for the built-in `icingadb` check.
|
||||
|
||||
Custom variables passed as [command parameters](03-monitoring-basics.md#command-passing-parameters):
|
||||
|
||||
Name | Description
|
||||
------------------------------------------|-----------------------------
|
||||
icingadb\_name | **Required.** The name of the Icinga DB connection object. Defaults to `icingadb`.
|
||||
icingadb\_redis\_dump\_takes\_warning | **Optional.** Warning threshold for ongoing Redis dump duration. Applies if the value is higher than the threshold. Defaults to 5 minutes.
|
||||
icingadb\_redis\_dump\_takes\_critical | **Optional.** Critical threshold for ongoing Redis dump duration. Applies if the value is higher than the threshold. Defaults to 10 minutes.
|
||||
icingadb\_database\_sync\_takes\_warning | **Optional.** Warning threshold for ongoing database sync duration. Applies if the value is higher than the threshold. Defaults to 5 minutes.
|
||||
icingadb\_database\_sync\_takes\_critical | **Optional.** Critical threshold for ongoing database sync duration. Applies if the value is higher than the threshold. Defaults to 10 minutes.
|
||||
icingadb\_redis\_backlog\_warning | **Optional.** Warning threshold for Redis write backlog. Applies if the value is higher than the threshold. Defaults to 5 minutes.
|
||||
icingadb\_redis\_backlog\_critical | **Optional.** Critical threshold for Redis write backlog. Applies if the value is higher than the threshold. Defaults to 15 minutes.
|
||||
icingadb\_database\_backlog\_warning | **Optional.** Warning threshold for database sync backlog. Applies if the value is higher than the threshold. Defaults to 5 minutes.
|
||||
icingadb\_database\_backlog\_critical | **Optional.** Critical threshold for database sync backlog. Applies if the value is higher than the threshold. Defaults to 15 minutes.
|
||||
|
||||
### ido <a id="itl-icinga-ido"></a>
|
||||
|
||||
Check command for the built-in `ido` check.
|
||||
|
|
|
@ -2,8 +2,11 @@
|
|||
|
||||
mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp)
|
||||
|
||||
mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp)
|
||||
|
||||
set(icingadb_SOURCES
|
||||
icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp
|
||||
icingadbchecktask.cpp icingadb-itl.cpp
|
||||
)
|
||||
|
||||
if(ICINGA2_UNITY_BUILD)
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
|
||||
|
||||
System.assert(Internal.run_with_activation_context(function() {
|
||||
template CheckCommand "icingadb-check-command" use (checkFunc = Internal.IcingadbCheck) {
|
||||
execute = checkFunc
|
||||
}
|
||||
|
||||
object CheckCommand "icingadb" {
|
||||
import "icingadb-check-command"
|
||||
|
||||
vars.icingadb_name = "icingadb"
|
||||
|
||||
vars.icingadb_redis_dump_takes_warning = 5m
|
||||
vars.icingadb_redis_dump_takes_critical = 10m
|
||||
vars.icingadb_database_sync_takes_warning = 5m
|
||||
vars.icingadb_database_sync_takes_critical = 10m
|
||||
vars.icingadb_redis_backlog_warning = 5m
|
||||
vars.icingadb_redis_backlog_critical = 15m
|
||||
vars.icingadb_database_backlog_warning = 5m
|
||||
vars.icingadb_database_backlog_critical = 15m
|
||||
}
|
||||
}))
|
||||
|
||||
Internal.remove("IcingadbCheck")
|
|
@ -171,6 +171,12 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
Log(LogInformation, "IcingaDB") << "Starting initial config/status dump";
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
SetOngoingDumpStart(startTime);
|
||||
|
||||
Defer resetOngoingDumpStart ([this]() {
|
||||
SetOngoingDumpStart(0);
|
||||
});
|
||||
|
||||
// Use a Workqueue to pack objects in parallel
|
||||
WorkQueue upq(25000, Configuration::Concurrency, LogNotice);
|
||||
upq.SetName("IcingaDB:ConfigDump");
|
||||
|
@ -402,6 +408,8 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
auto ourEnd (ourCheckSums.end());
|
||||
|
||||
auto flushSets ([&]() {
|
||||
auto affectedConfig (setObject.size() / 2u);
|
||||
|
||||
setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum});
|
||||
setObject.insert(setObject.begin(), {"HMSET", configObject});
|
||||
|
||||
|
@ -415,10 +423,12 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
setChecksum.clear();
|
||||
setObject.clear();
|
||||
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
|
||||
});
|
||||
|
||||
auto flushDels ([&]() {
|
||||
auto affectedConfig (delObject.size());
|
||||
|
||||
delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum});
|
||||
delObject.insert(delObject.begin(), {"HDEL", configObject});
|
||||
|
||||
|
@ -432,7 +442,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
delChecksum.clear();
|
||||
delObject.clear();
|
||||
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
|
||||
});
|
||||
|
||||
auto setOne ([&]() {
|
||||
|
@ -524,8 +534,14 @@ void IcingaDB::UpdateAllConfigObjects()
|
|||
m_Rcon->EnqueueCallback([&p](boost::asio::yield_context& yc) { p.set_value(); }, Prio::Config);
|
||||
p.get_future().wait();
|
||||
|
||||
auto endTime (Utility::GetTime());
|
||||
auto took (endTime - startTime);
|
||||
|
||||
SetLastdumpTook(took);
|
||||
SetLastdumpEnd(endTime);
|
||||
|
||||
Log(LogInformation, "IcingaDB")
|
||||
<< "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
|
||||
<< "Initial config/status dump finished in " << took << " seconds.";
|
||||
}
|
||||
|
||||
std::vector<std::vector<intrusive_ptr<ConfigObject>>> IcingaDB::ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize) {
|
||||
|
@ -1131,7 +1147,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode)
|
|||
streamadd.emplace_back(IcingaToStreamValue(kv.second));
|
||||
}
|
||||
|
||||
m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream);
|
||||
m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream, {0, 1});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1178,7 +1194,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
|
|||
|
||||
if (transaction.size() > 1) {
|
||||
transaction.push_back({"EXEC"});
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
|
||||
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
|
||||
}
|
||||
|
||||
if (checkable) {
|
||||
|
@ -2343,7 +2359,7 @@ void IcingaDB::ForwardHistoryEntries()
|
|||
|
||||
if (m_Rcon && m_Rcon->IsConnected()) {
|
||||
try {
|
||||
m_Rcon->GetResultsOfQueries(haystack, Prio::History);
|
||||
m_Rcon->GetResultsOfQueries(haystack, Prio::History, {0, 0, haystack.size()});
|
||||
break;
|
||||
} catch (const std::exception& ex) {
|
||||
logFailure(ex.what());
|
||||
|
|
|
@ -44,6 +44,11 @@ public:
|
|||
|
||||
String GetEnvironmentId() const override;
|
||||
|
||||
inline RedisConnection::Ptr GetConnection()
|
||||
{
|
||||
return m_Rcon;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
static void AddKvsToMap(const Array::Ptr& kvs, T& map)
|
||||
{
|
||||
|
|
|
@ -48,6 +48,16 @@ class IcingaDB : ConfigObject
|
|||
[no_storage] String environment_id {
|
||||
get;
|
||||
};
|
||||
|
||||
[set_protected] double ongoing_dump_start {
|
||||
default {{{ return 0; }}}
|
||||
};
|
||||
[state, set_protected] double lastdump_end {
|
||||
default {{{ return 0; }}}
|
||||
};
|
||||
[state, set_protected] double lastdump_took {
|
||||
default {{{ return 0; }}}
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,519 @@
|
|||
/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
|
||||
|
||||
#include "icingadb/icingadbchecktask.hpp"
|
||||
#include "icinga/host.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "remote/apilistener.hpp"
|
||||
#include "remote/endpoint.hpp"
|
||||
#include "remote/zone.hpp"
|
||||
#include "base/function.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include <utility>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
REGISTER_FUNCTION_NONCONST(Internal, IcingadbCheck, &IcingadbCheckTask::ScriptFunc, "checkable:cr:resolvedMacros:useResolvedMacros");
|
||||
|
||||
static void ReportIcingadbCheck(
|
||||
const Checkable::Ptr& checkable, const CheckCommand::Ptr& commandObj,
|
||||
const CheckResult::Ptr& cr, String output, ServiceState state)
|
||||
{
|
||||
if (Checkable::ExecuteCommandProcessFinishedHandler) {
|
||||
double now = Utility::GetTime();
|
||||
ProcessResult pr;
|
||||
pr.PID = -1;
|
||||
pr.Output = std::move(output);
|
||||
pr.ExecutionStart = now;
|
||||
pr.ExecutionEnd = now;
|
||||
pr.ExitStatus = state;
|
||||
|
||||
Checkable::ExecuteCommandProcessFinishedHandler(commandObj->GetName(), pr);
|
||||
} else {
|
||||
cr->SetState(state);
|
||||
cr->SetOutput(output);
|
||||
checkable->ProcessCheckResult(cr);
|
||||
}
|
||||
}
|
||||
|
||||
static inline
|
||||
double GetXMessageTs(const Array::Ptr& xMessage)
|
||||
{
|
||||
return Convert::ToLong(String(xMessage->Get(0)).Split("-")[0]) / 1000.0;
|
||||
}
|
||||
|
||||
void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
|
||||
const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros)
|
||||
{
|
||||
CheckCommand::Ptr commandObj = CheckCommand::ExecuteOverride ? CheckCommand::ExecuteOverride : checkable->GetCheckCommand();
|
||||
|
||||
Host::Ptr host;
|
||||
Service::Ptr service;
|
||||
tie(host, service) = GetHostService(checkable);
|
||||
|
||||
MacroProcessor::ResolverList resolvers;
|
||||
String silenceMissingMacroWarning;
|
||||
|
||||
if (MacroResolver::OverrideMacros)
|
||||
resolvers.emplace_back("override", MacroResolver::OverrideMacros);
|
||||
|
||||
if (service)
|
||||
resolvers.emplace_back("service", service);
|
||||
resolvers.emplace_back("host", host);
|
||||
resolvers.emplace_back("command", commandObj);
|
||||
resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
|
||||
|
||||
auto resolve ([&](const String& macro) {
|
||||
return MacroProcessor::ResolveMacros(macro, resolvers, checkable->GetLastCheckResult(),
|
||||
&silenceMissingMacroWarning, MacroProcessor::EscapeCallback(), resolvedMacros, useResolvedMacros);
|
||||
});
|
||||
|
||||
struct Thresholds
|
||||
{
|
||||
Value Warning, Critical;
|
||||
};
|
||||
|
||||
auto resolveThresholds ([&resolve](const String& wmacro, const String& cmacro) {
|
||||
return Thresholds{resolve(wmacro), resolve(cmacro)};
|
||||
});
|
||||
|
||||
String icingadbName = resolve("$icingadb_name$");
|
||||
|
||||
auto dumpTakesThresholds (resolveThresholds("$icingadb_redis_dump_takes_warning$", "$icingadb_redis_dump_takes_critical$"));
|
||||
auto syncTakesThresholds (resolveThresholds("$icingadb_database_sync_takes_warning$", "$icingadb_database_sync_takes_critical$"));
|
||||
auto icingaBacklogThresholds (resolveThresholds("$icingadb_redis_backlog_warning$", "$icingadb_redis_backlog_critical$"));
|
||||
auto icingadbBacklogThresholds (resolveThresholds("$icingadb_database_backlog_warning$", "$icingadb_database_backlog_critical$"));
|
||||
|
||||
if (resolvedMacros && !useResolvedMacros)
|
||||
return;
|
||||
|
||||
if (icingadbName.IsEmpty()) {
|
||||
ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Attribute 'icingadb_name' must be set.", ServiceUnknown);
|
||||
return;
|
||||
}
|
||||
|
||||
auto conn (IcingaDB::GetByName(icingadbName));
|
||||
|
||||
if (!conn) {
|
||||
ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Icinga DB connection '" + icingadbName + "' does not exist.", ServiceUnknown);
|
||||
return;
|
||||
}
|
||||
|
||||
auto redis (conn->GetConnection());
|
||||
|
||||
if (!redis->GetConnected()) {
|
||||
ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB CRITICAL: Could not connect to Redis.", ServiceCritical);
|
||||
return;
|
||||
}
|
||||
|
||||
auto now (Utility::GetTime());
|
||||
Array::Ptr redisTime, xReadHeartbeat, xReadStats, xReadRtuHistory;
|
||||
|
||||
try {
|
||||
auto replies (redis->GetResultsOfQueries(
|
||||
{
|
||||
{"TIME"},
|
||||
{"XREAD", "STREAMS", "icingadb:telemetry:heartbeat", "0-0"},
|
||||
{"XREAD", "STREAMS", "icingadb:telemetry:stats", "0-0"},
|
||||
{
|
||||
"XREAD", "COUNT", "1", "STREAMS",
|
||||
"icinga:runtime", "icinga:runtime:state",
|
||||
"icinga:history:stream:acknowledgement", "icinga:history:stream:comment",
|
||||
"icinga:history:stream:downtime", "icinga:history:stream:flapping",
|
||||
"icinga:history:stream:notification", "icinga:history:stream:state",
|
||||
"0-0", "0-0", "0-0", "0-0", "0-0", "0-0", "0-0", "0-0"
|
||||
}
|
||||
},
|
||||
RedisConnection::QueryPriority::Heartbeat
|
||||
));
|
||||
|
||||
redisTime = std::move(replies.at(0));
|
||||
xReadHeartbeat = std::move(replies.at(1));
|
||||
xReadStats = std::move(replies.at(2));
|
||||
xReadRtuHistory = std::move(replies.at(3));
|
||||
} catch (const std::exception& ex) {
|
||||
ReportIcingadbCheck(
|
||||
checkable, commandObj, cr,
|
||||
String("Icinga DB CRITICAL: Could not read XREAD responses from Redis: ") + ex.what(), ServiceCritical
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!xReadHeartbeat) {
|
||||
ReportIcingadbCheck(
|
||||
checkable, commandObj, cr,
|
||||
"Icinga DB CRITICAL: The Icinga DB daemon seems to have never run. (Missing heartbeat)",
|
||||
ServiceCritical
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
auto redisOldestPending (redis->GetOldestPendingQueryTs());
|
||||
auto ongoingDumpStart (conn->GetOngoingDumpStart());
|
||||
auto dumpWhen (conn->GetLastdumpEnd());
|
||||
auto dumpTook (conn->GetLastdumpTook());
|
||||
|
||||
auto redisNow (Convert::ToLong(redisTime->Get(0)) + Convert::ToLong(redisTime->Get(1)) / 1000000.0);
|
||||
Array::Ptr heartbeatMessage = Array::Ptr(Array::Ptr(xReadHeartbeat->Get(0))->Get(1))->Get(0);
|
||||
auto heartbeatTime (GetXMessageTs(heartbeatMessage));
|
||||
std::map<String, String> heartbeatData;
|
||||
|
||||
IcingaDB::AddKvsToMap(heartbeatMessage->Get(1), heartbeatData);
|
||||
|
||||
String version = heartbeatData.at("general:version");
|
||||
auto icingadbNow (Convert::ToLong(heartbeatData.at("general:time")) / 1000.0 + (redisNow - heartbeatTime));
|
||||
auto icingadbStartTime (Convert::ToLong(heartbeatData.at("general:start-time")) / 1000.0);
|
||||
String errMsg (heartbeatData.at("general:err"));
|
||||
auto errSince (Convert::ToLong(heartbeatData.at("general:err-since")) / 1000.0);
|
||||
Dictionary::Ptr goMetricsByCumulativity (JsonDecode(heartbeatData.at("go:metrics")));
|
||||
auto heartbeatLastReceived (Convert::ToLong(heartbeatData.at("heartbeat:last-received")) / 1000.0);
|
||||
bool weResponsible = Convert::ToLong(heartbeatData.at("ha:responsible"));
|
||||
auto weResponsibleTs (Convert::ToLong(heartbeatData.at("ha:responsible-ts")) / 1000.0);
|
||||
bool otherResponsible = Convert::ToLong(heartbeatData.at("ha:other-responsible"));
|
||||
auto syncOngoingSince (Convert::ToLong(heartbeatData.at("sync:ongoing-since")) / 1000.0);
|
||||
auto syncSuccessWhen (Convert::ToLong(heartbeatData.at("sync:success-finish")) / 1000.0);
|
||||
auto syncSuccessTook (Convert::ToLong(heartbeatData.at("sync:success-duration")) / 1000.0);
|
||||
|
||||
std::ostringstream i2okmsgs, idbokmsgs, warnmsgs, critmsgs;
|
||||
Array::Ptr perfdata = new Array();
|
||||
|
||||
i2okmsgs << std::fixed << std::setprecision(3);
|
||||
idbokmsgs << std::fixed << std::setprecision(3);
|
||||
warnmsgs << std::fixed << std::setprecision(3);
|
||||
critmsgs << std::fixed << std::setprecision(3);
|
||||
|
||||
const auto downForCritical (10);
|
||||
auto downFor (redisNow - heartbeatTime);
|
||||
bool down = false;
|
||||
|
||||
if (downFor > downForCritical) {
|
||||
down = true;
|
||||
|
||||
critmsgs << " Last seen " << Utility::FormatDuration(downFor)
|
||||
<< " ago, greater than CRITICAL threshold (" << Utility::FormatDuration(downForCritical) << ")!";
|
||||
} else {
|
||||
idbokmsgs << "\n* Last seen: " << Utility::FormatDuration(downFor) << " ago";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("icingadb_heartbeat_age", downFor, false, "seconds", Empty, downForCritical, 0));
|
||||
|
||||
const auto errForCritical (10);
|
||||
auto err (!errMsg.IsEmpty());
|
||||
auto errFor (icingadbNow - errSince);
|
||||
|
||||
if (err) {
|
||||
if (errFor > errForCritical) {
|
||||
critmsgs << " ERROR: " << errMsg << "!";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("err_for", errFor * (err ? 1 : -1), false, "seconds", Empty, errForCritical, 0));
|
||||
}
|
||||
|
||||
if (!down) {
|
||||
const auto heartbeatLagWarning (3/* Icinga DB read freq. */ + 1/* Icinga DB write freq. */ + 2/* threshold */);
|
||||
auto heartbeatLag (fmin(icingadbNow - heartbeatLastReceived, 10 * 60));
|
||||
|
||||
if (!heartbeatLastReceived) {
|
||||
critmsgs << " Lost Icinga 2 heartbeat!";
|
||||
} else if (heartbeatLag > heartbeatLagWarning) {
|
||||
warnmsgs << " Icinga 2 heartbeat lag: " << Utility::FormatDuration(heartbeatLag)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(heartbeatLagWarning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("icinga_heartbeat_lag", heartbeatLag, false, "seconds", heartbeatLagWarning, Empty, 0));
|
||||
}
|
||||
|
||||
if (weResponsible) {
|
||||
idbokmsgs << "\n* Responsible";
|
||||
} else if (otherResponsible) {
|
||||
idbokmsgs << "\n* Not responsible, but another instance is";
|
||||
} else {
|
||||
critmsgs << " No instance is responsible!";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("icingadb_responsible_instances", int(weResponsible || otherResponsible), false, "", Empty, Empty, 0, 1));
|
||||
|
||||
const auto clockDriftWarning (5);
|
||||
const auto clockDriftCritical (30);
|
||||
auto clockDrift (fmax(fabs(now - redisNow), fmax(fabs(redisNow - icingadbNow), fabs(icingadbNow - now))));
|
||||
|
||||
if (clockDrift > clockDriftCritical) {
|
||||
critmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift)
|
||||
<< ", greater than CRITICAL threshold (" << Utility::FormatDuration(clockDriftCritical) << ")!";
|
||||
} else if (clockDrift > clockDriftWarning) {
|
||||
warnmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(clockDriftWarning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("clock_drift", clockDrift, false, "seconds", clockDriftWarning, clockDriftCritical, 0));
|
||||
|
||||
if (ongoingDumpStart) {
|
||||
auto ongoingDumpTakes (now - ongoingDumpStart);
|
||||
|
||||
if (!dumpTakesThresholds.Critical.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Critical) {
|
||||
critmsgs << " Ongoing Icinga 2 dump already takes " << Utility::FormatDuration(ongoingDumpTakes)
|
||||
<< ", greater than CRITICAL threshold (" << Utility::FormatDuration(dumpTakesThresholds.Critical) << ")!";
|
||||
} else if (!dumpTakesThresholds.Warning.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Warning) {
|
||||
warnmsgs << " Ongoing Icinga 2 dump already takes " << Utility::FormatDuration(ongoingDumpTakes)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(dumpTakesThresholds.Warning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("redis_dump_takes", ongoingDumpTakes, false, "seconds",
|
||||
dumpTakesThresholds.Warning, dumpTakesThresholds.Critical, 0));
|
||||
}
|
||||
|
||||
if (!down && syncOngoingSince) {
|
||||
auto ongoingSyncTakes (icingadbNow - syncOngoingSince);
|
||||
|
||||
if (!syncTakesThresholds.Critical.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Critical) {
|
||||
critmsgs << " Ongoing sync already takes " << Utility::FormatDuration(ongoingSyncTakes)
|
||||
<< ", greater than CRITICAL threshold (" << Utility::FormatDuration(syncTakesThresholds.Critical) << ")!";
|
||||
} else if (!syncTakesThresholds.Warning.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Warning) {
|
||||
warnmsgs << " Ongoing sync already takes " << Utility::FormatDuration(ongoingSyncTakes)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(syncTakesThresholds.Warning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("database_sync_takes", ongoingSyncTakes, false, "seconds",
|
||||
syncTakesThresholds.Warning, syncTakesThresholds.Critical, 0));
|
||||
}
|
||||
|
||||
auto redisBacklog (now - redisOldestPending);
|
||||
|
||||
if (!redisOldestPending) {
|
||||
redisBacklog = 0;
|
||||
}
|
||||
|
||||
if (!icingaBacklogThresholds.Critical.IsEmpty() && redisBacklog > icingaBacklogThresholds.Critical) {
|
||||
critmsgs << " Icinga 2 query backlog: " << Utility::FormatDuration(redisBacklog)
|
||||
<< ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Critical) << ")!";
|
||||
} else if (!icingaBacklogThresholds.Warning.IsEmpty() && redisBacklog > icingaBacklogThresholds.Warning) {
|
||||
warnmsgs << " Icinga 2 query backlog: " << Utility::FormatDuration(redisBacklog)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Warning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("redis_backlog", redisBacklog, false, "seconds",
|
||||
icingaBacklogThresholds.Warning, icingaBacklogThresholds.Critical, 0));
|
||||
|
||||
if (!down) {
|
||||
double icingadbBacklog = 0;
|
||||
|
||||
if (xReadRtuHistory) {
|
||||
double minTs = 0;
|
||||
ObjectLock lock (xReadRtuHistory);
|
||||
|
||||
for (Array::Ptr stream : xReadRtuHistory) {
|
||||
if (!weResponsible) {
|
||||
String name = stream->Get(0);
|
||||
|
||||
if (name == "icinga:runtime" || name == "icinga:runtime:state") {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
auto ts (GetXMessageTs(Array::Ptr(stream->Get(1))->Get(0)));
|
||||
|
||||
if (minTs == 0 || ts < minTs) {
|
||||
minTs = ts;
|
||||
}
|
||||
}
|
||||
|
||||
if (minTs > 0) {
|
||||
icingadbBacklog = redisNow - minTs;
|
||||
}
|
||||
}
|
||||
|
||||
if (!icingadbBacklogThresholds.Critical.IsEmpty() && icingadbBacklog > icingadbBacklogThresholds.Critical) {
|
||||
critmsgs << " Query backlog: " << Utility::FormatDuration(icingadbBacklog)
|
||||
<< ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Critical) << ")!";
|
||||
} else if (!icingadbBacklogThresholds.Warning.IsEmpty() && icingadbBacklog > icingadbBacklogThresholds.Warning) {
|
||||
warnmsgs << " Query backlog: " << Utility::FormatDuration(icingadbBacklog)
|
||||
<< ", greater than WARNING threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Warning) << ").";
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("database_backlog", icingadbBacklog, false, "seconds",
|
||||
icingadbBacklogThresholds.Warning, icingadbBacklogThresholds.Critical, 0));
|
||||
}
|
||||
|
||||
auto dumpAgo (now - dumpWhen);
|
||||
|
||||
if (dumpWhen) {
|
||||
perfdata->Add(new PerfdataValue("redis_dump_ago", dumpAgo, false, "seconds", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
if (dumpTook) {
|
||||
perfdata->Add(new PerfdataValue("redis_dump_took", dumpTook, false, "seconds", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
if (dumpWhen && dumpTook) {
|
||||
i2okmsgs << "\n* Last dump: " << Utility::FormatDuration(dumpAgo)
|
||||
<< " ago, took " << Utility::FormatDuration(dumpTook);
|
||||
}
|
||||
|
||||
auto icingadbUptime (icingadbNow - icingadbStartTime);
|
||||
|
||||
if (!down) {
|
||||
perfdata->Add(new PerfdataValue("icingadb_uptime", icingadbUptime, false, "seconds", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
{
|
||||
static boost::regex wellNamedUnits (":(bytes|seconds)$");
|
||||
ObjectLock lock (goMetricsByCumulativity);
|
||||
|
||||
for (auto& kv : goMetricsByCumulativity) {
|
||||
bool cumulative = kv.first == "cumulative";
|
||||
Dictionary::Ptr goMetricsPerCumulativity = kv.second;
|
||||
ObjectLock lock (goMetricsPerCumulativity);
|
||||
|
||||
for (auto& kv : goMetricsPerCumulativity) {
|
||||
std::string unit;
|
||||
boost::smatch what;
|
||||
|
||||
if (boost::regex_search(kv.first.GetData(), what, wellNamedUnits)) {
|
||||
unit = what[1];
|
||||
}
|
||||
|
||||
bool counter = cumulative && unit.empty();
|
||||
auto label ("go" + kv.first);
|
||||
|
||||
for (auto& c : label) {
|
||||
if (!('a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9')) {
|
||||
c = '_';
|
||||
}
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue(std::move(label), kv.second, counter, std::move(unit)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (weResponsibleTs) {
|
||||
perfdata->Add(new PerfdataValue("icingadb_responsible_for",
|
||||
(weResponsible ? 1 : -1) * (icingadbNow - weResponsibleTs), false, "seconds"));
|
||||
}
|
||||
|
||||
auto syncAgo (icingadbNow - syncSuccessWhen);
|
||||
|
||||
if (syncSuccessWhen) {
|
||||
perfdata->Add(new PerfdataValue("database_sync_ago", syncAgo, false, "seconds", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
if (syncSuccessTook) {
|
||||
perfdata->Add(new PerfdataValue("database_sync_took", syncSuccessTook, false, "seconds", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
if (syncSuccessWhen && syncSuccessTook) {
|
||||
idbokmsgs << "\n* Last sync: " << Utility::FormatDuration(syncAgo)
|
||||
<< " ago, took " << Utility::FormatDuration(syncSuccessTook);
|
||||
}
|
||||
|
||||
std::map<String, RingBuffer> statsPerOp;
|
||||
|
||||
const char * const icingadbKnownStats[] = {
|
||||
"sync_config", "sync_state", "sync_history", "sync_overdue", "cleanup_history"
|
||||
};
|
||||
|
||||
for (auto metric : icingadbKnownStats) {
|
||||
statsPerOp.emplace(std::piecewise_construct, std::forward_as_tuple(metric), std::forward_as_tuple(15 * 60));
|
||||
}
|
||||
|
||||
if (xReadStats) {
|
||||
Array::Ptr messages = Array::Ptr(xReadStats->Get(0))->Get(1);
|
||||
ObjectLock lock (messages);
|
||||
|
||||
for (Array::Ptr message : messages) {
|
||||
auto ts (GetXMessageTs(message));
|
||||
std::map<String, String> opsPerSec;
|
||||
|
||||
IcingaDB::AddKvsToMap(message->Get(1), opsPerSec);
|
||||
|
||||
for (auto& kv : opsPerSec) {
|
||||
auto buf (statsPerOp.find(kv.first));
|
||||
|
||||
if (buf == statsPerOp.end()) {
|
||||
buf = statsPerOp.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(kv.first), std::forward_as_tuple(15 * 60)
|
||||
).first;
|
||||
}
|
||||
|
||||
buf->second.InsertValue(ts, Convert::ToLong(kv.second));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& kv : statsPerOp) {
|
||||
auto perMin (kv.second.UpdateAndGetValues(now, 60));
|
||||
|
||||
perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_1sec", perMin / 60.0, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_1min", perMin, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_5mins", kv.second.UpdateAndGetValues(now, 5 * 60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_15mins", kv.second.UpdateAndGetValues(now, 15 * 60), false, "", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
auto queriesPerMin (redis->GetQueryCount(60));
|
||||
|
||||
perfdata->Add(new PerfdataValue("redis_queries_1sec", queriesPerMin / 60.0, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("redis_queries_1min", queriesPerMin, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("redis_queries_5mins", redis->GetQueryCount(5 * 60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("redis_queries_15mins", redis->GetQueryCount(15 * 60), false, "", Empty, Empty, 0));
|
||||
|
||||
perfdata->Add(new PerfdataValue("redis_pending_queries", redis->GetPendingQueryCount(), false, "", Empty, Empty, 0));
|
||||
|
||||
struct {
|
||||
const char * Name;
|
||||
int (RedisConnection::* Getter)(RingBuffer::SizeType span, RingBuffer::SizeType tv);
|
||||
} const icingaWriteSubjects[] = {
|
||||
{"icinga_dump_config", &RedisConnection::GetWrittenConfigFor},
|
||||
{"icinga_dump_state", &RedisConnection::GetWrittenStateFor},
|
||||
{"icinga_dump_history", &RedisConnection::GetWrittenHistoryFor}
|
||||
};
|
||||
|
||||
for (auto subject : icingaWriteSubjects) {
|
||||
auto perMin ((redis.get()->*subject.Getter)(60, now));
|
||||
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_1sec", perMin / 60.0, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_1min", perMin, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_5mins", (redis.get()->*subject.Getter)(5 * 60, now), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_15mins", (redis.get()->*subject.Getter)(15 * 60, now), false, "", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
ServiceState state;
|
||||
std::ostringstream msgbuf;
|
||||
auto i2okmsg (i2okmsgs.str());
|
||||
auto idbokmsg (idbokmsgs.str());
|
||||
auto warnmsg (warnmsgs.str());
|
||||
auto critmsg (critmsgs.str());
|
||||
|
||||
msgbuf << "Icinga DB ";
|
||||
|
||||
if (!critmsg.empty()) {
|
||||
state = ServiceCritical;
|
||||
msgbuf << "CRITICAL:" << critmsg;
|
||||
|
||||
if (!warnmsg.empty()) {
|
||||
msgbuf << "\n\nWARNING:" << warnmsg;
|
||||
}
|
||||
} else if (!warnmsg.empty()) {
|
||||
state = ServiceWarning;
|
||||
msgbuf << "WARNING:" << warnmsg;
|
||||
} else {
|
||||
state = ServiceOK;
|
||||
msgbuf << "OK: Uptime: " << Utility::FormatDuration(icingadbUptime) << ". Version: " << version << ".";
|
||||
}
|
||||
|
||||
if (!i2okmsg.empty()) {
|
||||
msgbuf << "\n\nIcinga 2\n--------\n" << i2okmsg;
|
||||
}
|
||||
|
||||
if (!idbokmsg.empty()) {
|
||||
msgbuf << "\n\nIcinga DB\n---------\n" << idbokmsg;
|
||||
}
|
||||
|
||||
cr->SetPerformanceData(perfdata);
|
||||
ReportIcingadbCheck(checkable, commandObj, cr, msgbuf.str(), state);
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */
|
||||
|
||||
#ifndef ICINGADBCHECKTASK_H
|
||||
#define ICINGADBCHECKTASK_H
|
||||
|
||||
#include "icingadb/icingadb.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* Icinga DB check.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
class IcingadbCheckTask
|
||||
{
|
||||
public:
|
||||
static void ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
|
||||
const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros);
|
||||
|
||||
private:
|
||||
IcingadbCheckTask();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* ICINGADBCHECKTASK_H */
|
|
@ -113,7 +113,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
|
|||
* @param query Redis query
|
||||
* @param priority The query's priority
|
||||
*/
|
||||
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
|
||||
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
{
|
||||
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
|
||||
|
@ -121,9 +121,10 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
|
|||
}
|
||||
|
||||
auto item (Shared<Query>::Make(std::move(query)));
|
||||
auto ctime (Utility::GetTime());
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, ctime, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(1);
|
||||
});
|
||||
|
@ -135,7 +136,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
|
|||
* @param queries Redis queries
|
||||
* @param priority The queries' priority
|
||||
*/
|
||||
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
|
||||
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
for (auto& query : queries) {
|
||||
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
|
||||
|
@ -143,9 +144,10 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
|
|||
}
|
||||
|
||||
auto item (Shared<Queries>::Make(std::move(queries)));
|
||||
auto ctime (Utility::GetTime());
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, ctime, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(item->size());
|
||||
});
|
||||
|
@ -159,7 +161,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
|
|||
*
|
||||
* @return The response
|
||||
*/
|
||||
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
|
||||
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
{
|
||||
Log msg (LogDebug, "IcingaDB", "Executing query:");
|
||||
|
@ -169,9 +171,10 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
|||
std::promise<Reply> promise;
|
||||
auto future (promise.get_future());
|
||||
auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
|
||||
auto ctime (Utility::GetTime());
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, ctime, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(1);
|
||||
});
|
||||
|
@ -189,7 +192,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
|||
*
|
||||
* @return The responses
|
||||
*/
|
||||
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
|
||||
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
for (auto& query : queries) {
|
||||
Log msg (LogDebug, "IcingaDB", "Executing query:");
|
||||
|
@ -199,9 +202,10 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
|
|||
std::promise<Replies> promise;
|
||||
auto future (promise.get_future());
|
||||
auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
|
||||
auto ctime (Utility::GetTime());
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
|
||||
asio::post(m_Strand, [this, item, priority, ctime, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(item->first.size());
|
||||
});
|
||||
|
@ -213,8 +217,10 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
|
|||
|
||||
void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority)
|
||||
{
|
||||
asio::post(m_Strand, [this, callback, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback});
|
||||
auto ctime (Utility::GetTime());
|
||||
|
||||
asio::post(m_Strand, [this, callback, priority, ctime]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
|
||||
m_QueuedWrites.Set();
|
||||
});
|
||||
}
|
||||
|
@ -230,6 +236,36 @@ void RedisConnection::Sync()
|
|||
GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the enqueue time of the oldest still queued Redis query
|
||||
*
|
||||
* @return *nix timestamp or 0
|
||||
*/
|
||||
double RedisConnection::GetOldestPendingQueryTs()
|
||||
{
|
||||
auto promise (Shared<std::promise<double>>::Make());
|
||||
auto future (promise->get_future());
|
||||
|
||||
asio::post(m_Strand, [this, promise]() {
|
||||
double oldest = 0;
|
||||
|
||||
for (auto& queue : m_Queues.Writes) {
|
||||
if (!queue.second.empty()) {
|
||||
auto ctime (queue.second.front().CTime);
|
||||
|
||||
if (ctime < oldest || oldest == 0) {
|
||||
oldest = ctime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
promise->set_value(oldest);
|
||||
});
|
||||
|
||||
future.wait();
|
||||
return future.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark kind as kind of queries not to actually send yet
|
||||
*
|
||||
|
@ -625,6 +661,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
|||
if (next.Callback) {
|
||||
next.Callback(yc);
|
||||
}
|
||||
|
||||
RecordAffected(next.Affects, Utility::GetTime());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -674,6 +712,11 @@ void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_contex
|
|||
m_ConnectedCallback = std::move(callback);
|
||||
}
|
||||
|
||||
int RedisConnection::GetQueryCount(RingBuffer::SizeType span)
|
||||
{
|
||||
return m_OutputQueries.UpdateAndGetValues(Utility::GetTime(), span);
|
||||
}
|
||||
|
||||
void RedisConnection::IncreasePendingQueries(int count)
|
||||
{
|
||||
if (m_Parent) {
|
||||
|
@ -701,3 +744,26 @@ void RedisConnection::DecreasePendingQueries(int count)
|
|||
m_OutputQueries.InsertValue(Utility::GetTime(), count);
|
||||
}
|
||||
}
|
||||
|
||||
void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when)
|
||||
{
|
||||
if (m_Parent) {
|
||||
auto parent (m_Parent);
|
||||
|
||||
asio::post(parent->m_Strand, [parent, affected, when]() {
|
||||
parent->RecordAffected(affected, when);
|
||||
});
|
||||
} else {
|
||||
if (affected.Config) {
|
||||
m_WrittenConfig.InsertValue(when, affected.Config);
|
||||
}
|
||||
|
||||
if (affected.State) {
|
||||
m_WrittenState.InsertValue(when, affected.State);
|
||||
}
|
||||
|
||||
if (affected.History) {
|
||||
m_WrittenHistory.InsertValue(when, affected.History);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,16 @@ namespace icinga
|
|||
SyncConnection = 255
|
||||
};
|
||||
|
||||
struct QueryAffects
|
||||
{
|
||||
size_t Config;
|
||||
size_t State;
|
||||
size_t History;
|
||||
|
||||
QueryAffects(size_t config = 0, size_t state = 0, size_t history = 0)
|
||||
: Config(config), State(state), History(history) { }
|
||||
};
|
||||
|
||||
RedisConnection(const String& host, int port, const String& path, const String& password, int db,
|
||||
bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
|
||||
const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr);
|
||||
|
@ -84,20 +94,48 @@ namespace icinga
|
|||
|
||||
bool IsConnected();
|
||||
|
||||
void FireAndForgetQuery(Query query, QueryPriority priority);
|
||||
void FireAndForgetQueries(Queries queries, QueryPriority priority);
|
||||
void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {});
|
||||
void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
|
||||
|
||||
Reply GetResultOfQuery(Query query, QueryPriority priority);
|
||||
Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
|
||||
Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {});
|
||||
Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
|
||||
|
||||
void EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, QueryPriority priority);
|
||||
void Sync();
|
||||
double GetOldestPendingQueryTs();
|
||||
|
||||
void SuppressQueryKind(QueryPriority kind);
|
||||
void UnsuppressQueryKind(QueryPriority kind);
|
||||
|
||||
void SetConnectedCallback(std::function<void(boost::asio::yield_context& yc)> callback);
|
||||
|
||||
inline bool GetConnected()
|
||||
{
|
||||
return m_Connected.load();
|
||||
}
|
||||
|
||||
int GetQueryCount(RingBuffer::SizeType span);
|
||||
|
||||
inline int GetPendingQueryCount()
|
||||
{
|
||||
return m_PendingQueries;
|
||||
}
|
||||
|
||||
inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenConfig.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenState.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenHistory.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* What to do with the responses to Redis queries.
|
||||
|
@ -134,6 +172,9 @@ namespace icinga
|
|||
Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery;
|
||||
Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries;
|
||||
std::function<void(boost::asio::yield_context&)> Callback;
|
||||
|
||||
double CTime;
|
||||
QueryAffects Affects;
|
||||
};
|
||||
|
||||
typedef boost::asio::ip::tcp Tcp;
|
||||
|
@ -175,6 +216,7 @@ namespace icinga
|
|||
|
||||
void IncreasePendingQueries(int count);
|
||||
void DecreasePendingQueries(int count);
|
||||
void RecordAffected(QueryAffects affected, double when);
|
||||
|
||||
template<class StreamPtr>
|
||||
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
|
||||
|
@ -225,7 +267,10 @@ namespace icinga
|
|||
|
||||
// Stats
|
||||
RingBuffer m_InputQueries{10};
|
||||
RingBuffer m_OutputQueries{10};
|
||||
RingBuffer m_OutputQueries{15 * 60};
|
||||
RingBuffer m_WrittenConfig{15 * 60};
|
||||
RingBuffer m_WrittenState{15 * 60};
|
||||
RingBuffer m_WrittenHistory{15 * 60};
|
||||
int m_PendingQueries{0};
|
||||
boost::asio::deadline_timer m_LogStatsTimer;
|
||||
Ptr m_Parent;
|
||||
|
|
Loading…
Reference in New Issue