Merge pull request #5988 from Icinga/fix/concurrent-checks-limit-cluster

Fix concurrent checks limit while using command_endpoint
This commit is contained in:
Noah Hilverling 2018-02-06 14:13:49 +01:00 committed by GitHub
commit e1e06ce767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 323 additions and 112 deletions

View File

@ -818,6 +818,10 @@ This mode will force the Icinga 2 node to execute commands remotely on a specifi
The host/service object configuration is located on the master/satellite and the client only
needs the CheckCommand object definitions being used there.
Every endpoint has its own remote check queue. The amount of checks executed simultaneously
can be limited on the endpoint with the `MaxConcurrentChecks` constant defined in [constants.conf](04-configuring-icinga-2.md#constants-conf). Icinga 2 may discard check requests,
if the remote check queue is full.
![Icinga 2 Distributed Top Down Command Endpoint](images/distributed-monitoring/icinga2_distributed_top_down_command_endpoint.png)
Advantages:

View File

@ -247,16 +247,14 @@ This configuration object is available as [checker feature](11-cli-commands.md#c
Example:
```
object CheckerComponent "checker" {
concurrent_checks = 512
}
object CheckerComponent "checker" { }
```
Configuration Attributes:
Name | Type | Description
--------------------------|-----------------------|----------------------------------
concurrent\_checks | Number | **Optional.** The maximum number of concurrent checks. Defaults to 512.
concurrent\_checks | Number | **Optional and deprecated.** The maximum number of concurrent checks. Was replaced by global constant `MaxConcurrentChecks` which will be set if you still use `concurrent_checks`.
## CheckResultReader <a id="objecttype-checkresultreader"></a>

View File

@ -398,6 +398,7 @@ PlatformKernelVersion|**Read-only.** The version of the operating system kernel,
BuildCompilerName |**Read-only.** The name of the compiler Icinga was built with, e.g. "Clang".
BuildCompilerVersion|**Read-only.** The version of the compiler Icinga was built with, e.g. "7.3.0.7030031".
BuildHostName |**Read-only.** The name of the host Icinga was built on, e.g. "acheron".
MaxConcurrentChecks |**Read-write**. The number of max checks run simultaneously. Defaults to 512.
Advanced runtime constants. Please only use them if advised by support or developers.

View File

@ -161,6 +161,7 @@ static int Main()
Application::DeclareRLimitStack(Application::GetDefaultRLimitStack());
#endif /* __linux__ */
Application::DeclareConcurrency(std::thread::hardware_concurrency());
Application::DeclareMaxConcurrentChecks(Application::GetDefaultMaxConcurrentChecks());
ScriptGlobal::Set("AttachDebugger", false);

View File

@ -1552,6 +1552,48 @@ int Application::GetConcurrency()
return ScriptGlobal::Get("Concurrency", &defaultConcurrency);
}
/**
* Sets the max concurrent checks.
*
* @param maxChecks The new limit.
*/
void Application::SetMaxConcurrentChecks(int maxChecks)
{
ScriptGlobal::Set("MaxConcurrentChecks", maxChecks);
}
/**
* Sets the max concurrent checks.
*
* @param maxChecks The new limit.
*/
void Application::DeclareMaxConcurrentChecks(int maxChecks)
{
if (!ScriptGlobal::Exists("MaxConcurrentChecks"))
ScriptGlobal::Set("MaxConcurrentChecks", maxChecks);
}
/**
* Retrieves the max concurrent checks.
*
* @returns The max number of concurrent checks.
*/
int Application::GetMaxConcurrentChecks()
{
Value defaultMaxConcurrentChecks = GetDefaultMaxConcurrentChecks();
return ScriptGlobal::Get("MaxConcurrentChecks", &defaultMaxConcurrentChecks);
}
/**
* Retrieves the default value for max concurrent checks.
*
* @returns The default max number of concurrent checks.
*/
int Application::GetDefaultMaxConcurrentChecks()
{
return 512;
}
/**
* Returns the global thread pool.
*

View File

@ -146,6 +146,11 @@ public:
static int GetConcurrency();
static void DeclareConcurrency(int ncpus);
static int GetMaxConcurrentChecks();
static int GetDefaultMaxConcurrentChecks();
static void DeclareMaxConcurrentChecks(int maxChecks);
static void SetMaxConcurrentChecks(int maxChecks);
static ThreadPool& GetTP();
static String GetAppVersion();

View File

@ -27,8 +27,14 @@ namespace icinga
class CheckerComponent : ConfigObject
{
[config] int concurrent_checks {
get {{{
return Application::GetMaxConcurrentChecks();
}}}
set {{{
Application::SetMaxConcurrentChecks(value);
}}}
default {{{
return 512;
return Application::GetDefaultMaxConcurrentChecks();
}}}
};
};

View File

@ -49,7 +49,7 @@ set(icinga_SOURCES
checkcommand.cpp checkcommand.hpp checkcommand-ti.hpp
checkresult.cpp checkresult.hpp checkresult-ti.hpp
cib.cpp cib.hpp
clusterevents.cpp clusterevents.hpp
clusterevents.cpp clusterevents.hpp clusterevents-check.cpp
command.cpp command.hpp command-ti.hpp
comment.cpp comment.hpp comment-ti.hpp
compatutility.cpp compatutility.hpp

View File

@ -42,6 +42,7 @@ boost::signals2::signal<void (const Checkable::Ptr&)> Checkable::OnNextCheckUpda
boost::mutex Checkable::m_StatsMutex;
int Checkable::m_PendingChecks = 0;
boost::condition_variable Checkable::m_PendingChecksCV;
CheckCommand::Ptr Checkable::GetCheckCommand() const
{
@ -544,6 +545,7 @@ void Checkable::DecreasePendingChecks()
{
boost::mutex::scoped_lock lock(m_StatsMutex);
m_PendingChecks--;
m_PendingChecksCV.notify_one();
}
int Checkable::GetPendingChecks()
@ -551,3 +553,12 @@ int Checkable::GetPendingChecks()
boost::mutex::scoped_lock lock(m_StatsMutex);
return m_PendingChecks;
}
void Checkable::AquirePendingCheckSlot(int maxPendingChecks)
{
boost::mutex::scoped_lock lock(m_StatsMutex);
while (m_PendingChecks >= maxPendingChecks)
m_PendingChecksCV.wait(lock);
m_PendingChecks++;
}

View File

@ -197,6 +197,7 @@ public:
static void IncreasePendingChecks();
static void DecreasePendingChecks();
static int GetPendingChecks();
static void AquirePendingCheckSlot(int maxPendingChecks);
static Object::Ptr GetPrototype();
@ -211,6 +212,7 @@ private:
static boost::mutex m_StatsMutex;
static int m_PendingChecks;
static boost::condition_variable m_PendingChecksCV;
/* Downtimes */
std::set<Downtime::Ptr> m_Downtimes;

View File

@ -20,6 +20,7 @@
#include "icinga/cib.hpp"
#include "icinga/host.hpp"
#include "icinga/service.hpp"
#include "icinga/clusterevents.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
@ -305,6 +306,8 @@ void CIB::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) {
status->Set("active_service_checks_15min", GetActiveServiceChecksStatistics(60 * 15));
status->Set("passive_service_checks_15min", GetPassiveServiceChecksStatistics(60 * 15));
status->Set("remote_check_queue", ClusterEvents::GetCheckRequestQueueSize());
CheckableCheckStatistics scs = CalculateServiceCheckStats();
status->Set("min_latency", scs.min_latency);

View File

@ -0,0 +1,225 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "icinga/clusterevents.hpp"
#include "remote/apilistener.hpp"
#include "base/serializer.hpp"
#include "base/exception.hpp"
#include <boost/thread/once.hpp>
#include <thread>
using namespace icinga;
boost::mutex ClusterEvents::m_Mutex;
std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
bool ClusterEvents::m_CheckSchedulerRunning;
int ClusterEvents::m_ChecksExecutedDuringInterval;
int ClusterEvents::m_ChecksDroppedDuringInterval;
Timer::Ptr ClusterEvents::m_LogTimer;
void ClusterEvents::RemoteCheckThreadProc()
{
Utility::SetThreadName("Remote Check Scheduler");
boost::mutex::scoped_lock lock(m_Mutex);
for(;;) {
if (m_CheckRequestQueue.empty())
break;
lock.unlock();
Checkable::AquirePendingCheckSlot(Application::GetMaxConcurrentChecks());
lock.lock();
auto callback = m_CheckRequestQueue.front();
m_CheckRequestQueue.pop_front();
m_ChecksExecutedDuringInterval++;
lock.unlock();
callback();
Checkable::DecreasePendingChecks();
lock.lock();
}
m_CheckSchedulerRunning = false;
}
void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
static boost::once_flag once = BOOST_ONCE_INIT;
boost::call_once(once, []() {
m_LogTimer = new Timer();
m_LogTimer->SetInterval(10);
m_LogTimer->OnTimerExpired.connect(std::bind(ClusterEvents::LogRemoteCheckQueueInformation));
m_LogTimer->Start();
});
boost::mutex::scoped_lock lock(m_Mutex);
if (m_CheckRequestQueue.size() >= 25000) {
m_ChecksDroppedDuringInterval++;
return;
}
m_CheckRequestQueue.push_back(std::bind(ClusterEvents::ExecuteCheckFromQueue, origin, params));
if (!m_CheckSchedulerRunning) {
std::thread t(ClusterEvents::RemoteCheckThreadProc);
t.detach();
m_CheckSchedulerRunning = true;
}
}
void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint();
if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
Log(LogNotice, "ClusterEvents")
<< "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
return;
}
ApiListener::Ptr listener = ApiListener::GetInstance();
if (!listener) {
Log(LogCritical, "ApiListener", "No instance available.");
return;
}
if (!listener->GetAcceptCommands()) {
Log(LogWarning, "ApiListener")
<< "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
Host::Ptr host = new Host();
Dictionary::Ptr attrs = new Dictionary();
attrs->Set("__name", params->Get("host"));
attrs->Set("type", "Host");
attrs->Set("enable_active_checks", false);
Deserialize(host, attrs, false, FAConfig);
if (params->Contains("service"))
host->SetExtension("agent_service_name", params->Get("service"));
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.");
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
return;
}
/* use a virtual host object for executing the command */
Host::Ptr host = new Host();
Dictionary::Ptr attrs = new Dictionary();
attrs->Set("__name", params->Get("host"));
attrs->Set("type", "Host");
Deserialize(host, attrs, false, FAConfig);
if (params->Contains("service"))
host->SetExtension("agent_service_name", params->Get("service"));
String command = params->Get("command");
String command_type = params->Get("command_type");
if (command_type == "check_command") {
if (!CheckCommand::GetByName(command)) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
cr->SetOutput("Check command '" + command + "' does not exist.");
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
return;
}
} else if (command_type == "event_command") {
if (!EventCommand::GetByName(command)) {
Log(LogWarning, "ClusterEvents")
<< "Event command '" << command << "' does not exist.";
return;
}
} else
return;
attrs->Set(command_type, params->Get("command"));
attrs->Set("command_endpoint", sourceEndpoint->GetName());
Deserialize(host, attrs, false, FAConfig);
host->SetExtension("agent_check", true);
Dictionary::Ptr macros = params->Get("macros");
if (command_type == "check_command") {
try {
host->ExecuteRemoteCheck(macros);
} catch (const std::exception& ex) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
cr->SetOutput(output);
double now = Utility::GetTime();
cr->SetScheduleStart(now);
cr->SetScheduleEnd(now);
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
Log(LogCritical, "checker", output);
}
} else if (command_type == "event_command") {
host->ExecuteEventHandler(macros, true);
}
}
int ClusterEvents::GetCheckRequestQueueSize()
{
return m_CheckRequestQueue.size();
}
void ClusterEvents::LogRemoteCheckQueueInformation() {
if (m_ChecksDroppedDuringInterval > 0) {
Log(LogCritical, "ClusterEvents")
<< "Remote check queue ran out of slots. "
<< m_ChecksDroppedDuringInterval << " checks dropped.";
m_ChecksDroppedDuringInterval = 0;
}
if (m_ChecksExecutedDuringInterval == 0)
return;
Log(LogInformation, "RemoteCheckQueue")
<< "items: " << m_CheckRequestQueue.size()
<< ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
<< "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
<< m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
<< m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
m_ChecksExecutedDuringInterval = 0;
}

View File

@ -578,112 +578,7 @@ Value ClusterEvents::AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr&
Value ClusterEvents::ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint();
if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
Log(LogNotice, "ClusterEvents")
<< "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
return Empty;
}
ApiListener::Ptr listener = ApiListener::GetInstance();
if (!listener) {
Log(LogCritical, "ApiListener", "No instance available.");
return Empty;
}
if (!listener->GetAcceptCommands()) {
Log(LogWarning, "ApiListener")
<< "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
Host::Ptr host = new Host();
Dictionary::Ptr attrs = new Dictionary();
attrs->Set("__name", params->Get("host"));
attrs->Set("type", "Host");
attrs->Set("enable_active_checks", false);
Deserialize(host, attrs, false, FAConfig);
if (params->Contains("service"))
host->SetExtension("agent_service_name", params->Get("service"));
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.");
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
return Empty;
}
/* use a virtual host object for executing the command */
Host::Ptr host = new Host();
Dictionary::Ptr attrs = new Dictionary();
attrs->Set("__name", params->Get("host"));
attrs->Set("type", "Host");
Deserialize(host, attrs, false, FAConfig);
if (params->Contains("service"))
host->SetExtension("agent_service_name", params->Get("service"));
String command = params->Get("command");
String command_type = params->Get("command_type");
if (command_type == "check_command") {
if (!CheckCommand::GetByName(command)) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
cr->SetOutput("Check command '" + command + "' does not exist.");
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
return Empty;
}
} else if (command_type == "event_command") {
if (!EventCommand::GetByName(command)) {
Log(LogWarning, "ClusterEvents")
<< "Event command '" << command << "' does not exist.";
return Empty;
}
} else
return Empty;
attrs->Set(command_type, params->Get("command"));
attrs->Set("command_endpoint", sourceEndpoint->GetName());
Deserialize(host, attrs, false, FAConfig);
host->SetExtension("agent_check", true);
Dictionary::Ptr macros = params->Get("macros");
if (command_type == "check_command") {
try {
host->ExecuteRemoteCheck(macros);
} catch (const std::exception& ex) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
cr->SetOutput(output);
double now = Utility::GetTime();
cr->SetScheduleStart(now);
cr->SetScheduleEnd(now);
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
listener->SyncSendMessage(sourceEndpoint, message);
Log(LogCritical, "checker", output);
}
} else if (command_type == "event_command") {
host->ExecuteEventHandler(macros, true);
}
EnqueueCheck(origin, params);
return Empty;
}

View File

@ -74,6 +74,21 @@ public:
static void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType notificationType, const CheckResult::Ptr& cr, const String& author, const String& commentText, const MessageOrigin::Ptr& origin);
static Value NotificationSentToAllUsersAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
static int GetCheckRequestQueueSize();
static void LogRemoteCheckQueueInformation();
private:
static boost::mutex m_Mutex;
static std::deque<std::function<void ()>> m_CheckRequestQueue;
static bool m_CheckSchedulerRunning;
static int m_ChecksExecutedDuringInterval;
static int m_ChecksDroppedDuringInterval;
static Timer::Ptr m_LogTimer;
static void RemoteCheckThreadProc();
static void EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
static void ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
};
}

View File

@ -23,6 +23,7 @@
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/clusterevents.hpp"
#include "base/application.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
@ -84,6 +85,8 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
perfdata->Add(new PerfdataValue("active_service_checks_15min", CIB::GetActiveServiceChecksStatistics(60 * 15)));
perfdata->Add(new PerfdataValue("passive_service_checks_15min", CIB::GetPassiveServiceChecksStatistics(60 * 15)));
perfdata->Add(new PerfdataValue("remote_check_queue", ClusterEvents::GetCheckRequestQueueSize()));
CheckableCheckStatistics scs = CIB::CalculateServiceCheckStats();
perfdata->Add(new PerfdataValue("min_latency", scs.min_latency));