2019-02-25 14:48:22 +01:00
|
|
|
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
2018-01-16 10:40:08 +01:00
|
|
|
|
|
|
|
#include "icinga/clusterevents.hpp"
|
2019-04-15 16:56:30 +02:00
|
|
|
#include "icinga/icingaapplication.hpp"
|
2018-01-16 10:40:08 +01:00
|
|
|
#include "remote/apilistener.hpp"
|
2019-04-15 16:56:30 +02:00
|
|
|
#include "base/configuration.hpp"
|
2018-01-16 10:40:08 +01:00
|
|
|
#include "base/serializer.hpp"
|
|
|
|
#include "base/exception.hpp"
|
2018-01-18 15:22:16 +01:00
|
|
|
#include <boost/thread/once.hpp>
|
2018-01-16 10:40:08 +01:00
|
|
|
#include <thread>
|
|
|
|
|
|
|
|
using namespace icinga;
|
|
|
|
|
|
|
|
boost::mutex ClusterEvents::m_Mutex;
|
|
|
|
std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
|
|
|
|
bool ClusterEvents::m_CheckSchedulerRunning;
|
2018-01-18 15:22:16 +01:00
|
|
|
int ClusterEvents::m_ChecksExecutedDuringInterval;
|
|
|
|
int ClusterEvents::m_ChecksDroppedDuringInterval;
|
|
|
|
Timer::Ptr ClusterEvents::m_LogTimer;
|
2018-01-16 10:40:08 +01:00
|
|
|
|
|
|
|
void ClusterEvents::RemoteCheckThreadProc()
|
|
|
|
{
|
|
|
|
Utility::SetThreadName("Remote Check Scheduler");
|
|
|
|
|
2019-04-15 16:56:30 +02:00
|
|
|
int maxConcurrentChecks = IcingaApplication::GetInstance()->GetMaxConcurrentChecks();
|
|
|
|
|
2018-01-16 10:40:08 +01:00
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
for(;;) {
|
|
|
|
if (m_CheckRequestQueue.empty())
|
|
|
|
break;
|
|
|
|
|
|
|
|
lock.unlock();
|
2019-04-15 16:56:30 +02:00
|
|
|
Checkable::AquirePendingCheckSlot(maxConcurrentChecks);
|
2018-01-16 10:40:08 +01:00
|
|
|
lock.lock();
|
|
|
|
|
|
|
|
auto callback = m_CheckRequestQueue.front();
|
|
|
|
m_CheckRequestQueue.pop_front();
|
2018-01-18 15:22:16 +01:00
|
|
|
m_ChecksExecutedDuringInterval++;
|
2018-01-16 10:40:08 +01:00
|
|
|
lock.unlock();
|
|
|
|
|
|
|
|
callback();
|
|
|
|
Checkable::DecreasePendingChecks();
|
|
|
|
|
|
|
|
lock.lock();
|
|
|
|
}
|
|
|
|
|
|
|
|
m_CheckSchedulerRunning = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
|
|
|
|
{
|
2018-01-18 15:22:16 +01:00
|
|
|
static boost::once_flag once = BOOST_ONCE_INIT;
|
|
|
|
|
|
|
|
boost::call_once(once, []() {
|
|
|
|
m_LogTimer = new Timer();
|
|
|
|
m_LogTimer->SetInterval(10);
|
|
|
|
m_LogTimer->OnTimerExpired.connect(std::bind(ClusterEvents::LogRemoteCheckQueueInformation));
|
|
|
|
m_LogTimer->Start();
|
|
|
|
});
|
|
|
|
|
2018-01-16 10:40:08 +01:00
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
if (m_CheckRequestQueue.size() >= 25000) {
|
2018-01-18 15:22:16 +01:00
|
|
|
m_ChecksDroppedDuringInterval++;
|
2018-01-16 10:40:08 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
m_CheckRequestQueue.push_back(std::bind(ClusterEvents::ExecuteCheckFromQueue, origin, params));
|
|
|
|
|
|
|
|
if (!m_CheckSchedulerRunning) {
|
|
|
|
std::thread t(ClusterEvents::RemoteCheckThreadProc);
|
|
|
|
t.detach();
|
|
|
|
m_CheckSchedulerRunning = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
|
|
|
|
|
2020-06-30 13:47:13 +02:00
|
|
|
Endpoint::Ptr sourceEndpoint;
|
|
|
|
if (origin->FromClient) {
|
|
|
|
sourceEndpoint = origin->FromClient->GetEndpoint();
|
|
|
|
} else if (origin->IsLocal()){
|
|
|
|
sourceEndpoint = Endpoint::GetLocalEndpoint();
|
|
|
|
}
|
2018-01-16 10:40:08 +01:00
|
|
|
|
|
|
|
if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
|
|
|
|
Log(LogNotice, "ClusterEvents")
|
|
|
|
<< "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
ApiListener::Ptr listener = ApiListener::GetInstance();
|
|
|
|
|
|
|
|
if (!listener) {
|
|
|
|
Log(LogCritical, "ApiListener", "No instance available.");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-06-02 11:52:58 +02:00
|
|
|
if (params->Contains("source")) {
|
2020-07-10 10:05:38 +02:00
|
|
|
String uuid = params->Get("source");
|
|
|
|
|
2020-07-10 16:56:07 +02:00
|
|
|
Host::Ptr host = Host::GetByName(params->Get("host"));
|
|
|
|
if (!host) {
|
|
|
|
Log(LogCritical, "ApiListener", "Host not found.");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-07-13 10:14:30 +02:00
|
|
|
Checkable::Ptr checkable;
|
2020-07-10 16:56:07 +02:00
|
|
|
if (params->Contains("service"))
|
|
|
|
checkable = host->GetServiceByShortName(params->Get("service"));
|
|
|
|
else
|
|
|
|
checkable = host;
|
|
|
|
|
|
|
|
if (!checkable) {
|
|
|
|
Log(LogCritical, "ApiListener", "Checkable not found.");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
ObjectLock oLock (checkable);
|
|
|
|
|
|
|
|
if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) {
|
|
|
|
Log(LogNotice, "ApiListener")
|
2020-07-13 09:47:19 +02:00
|
|
|
<< "Discarding 'ExecuteCheckFromQueue' event for checkable '" << checkable->GetName()
|
|
|
|
<< "' from '" << origin->FromClient->GetIdentity() << "': Unauthorized access.";
|
2020-07-10 16:56:07 +02:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-07-13 10:20:23 +02:00
|
|
|
/* Check deadline */
|
|
|
|
double deadline = params->Get("deadline");
|
|
|
|
if (Utility::GetTime() > deadline) {
|
|
|
|
Log(LogNotice, "ApiListener")
|
|
|
|
<< "Discarding 'ExecuteCheckFromQueue' event for checkable '" << checkable->GetName()
|
|
|
|
<< "' from '" << origin->FromClient->GetIdentity() << "': Deadline has expired.";
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-07-13 09:47:19 +02:00
|
|
|
Checkable::ExecuteCommandProcessFinishedHandler = [listener, sourceEndpoint, origin, params] (const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const Value& commandLine, const ProcessResult& pr) -> void {
|
|
|
|
Checkable::CurrentConcurrentChecks.fetch_sub(1);
|
|
|
|
Checkable::DecreasePendingChecks();
|
|
|
|
|
|
|
|
if (pr.ExitStatus > 3) {
|
|
|
|
Process::Arguments parguments = Process::PrepareCommand(commandLine);
|
|
|
|
Log(LogWarning, "ApiListener")
|
|
|
|
<< "Check command for object '" << checkable->GetName() << "' (PID: " << pr.PID
|
|
|
|
<< ", arguments: " << Process::PrettyPrintArguments(parguments) << ") terminated with exit code "
|
|
|
|
<< pr.ExitStatus << ", output: " << pr.Output;
|
|
|
|
}
|
|
|
|
|
|
|
|
String output = pr.Output.Trim();
|
|
|
|
|
|
|
|
std::pair<String, String> co = PluginUtility::ParseCheckOutput(output);
|
|
|
|
cr->SetCommand(commandLine);
|
|
|
|
cr->SetOutput(co.first);
|
|
|
|
cr->SetPerformanceData(PluginUtility::SplitPerfdata(co.second));
|
|
|
|
cr->SetState(PluginUtility::ExitStatusToState(pr.ExitStatus));
|
|
|
|
cr->SetExitStatus(pr.ExitStatus);
|
|
|
|
cr->SetExecutionStart(pr.ExecutionStart);
|
|
|
|
cr->SetExecutionEnd(pr.ExecutionEnd);
|
|
|
|
|
|
|
|
Dictionary::Ptr executedParams = new Dictionary();
|
|
|
|
params->CopyTo(executedParams);
|
|
|
|
executedParams->Set("execution", params->Get("source"));
|
|
|
|
executedParams->Set("check_result", Serialize(cr));
|
|
|
|
|
|
|
|
if (origin->IsLocal()) {
|
|
|
|
ClusterEvents::ExecutedCommandAPIHandler(origin, executedParams);
|
|
|
|
} else {
|
|
|
|
Dictionary::Ptr executedMessage = new Dictionary();
|
|
|
|
executedMessage->Set("jsonrpc", "2.0");
|
|
|
|
executedMessage->Set("method", "event::ExecutedCommand");
|
|
|
|
executedMessage->Set("params", executedParams);
|
|
|
|
|
|
|
|
listener->SyncSendMessage(sourceEndpoint, executedMessage);
|
|
|
|
}
|
|
|
|
};
|
2020-07-10 16:56:07 +02:00
|
|
|
} else {
|
|
|
|
Checkable::ExecuteCommandProcessFinishedHandler = nullptr;
|
2020-06-02 11:52:58 +02:00
|
|
|
}
|
|
|
|
|
2018-01-16 10:40:08 +01:00
|
|
|
if (!listener->GetAcceptCommands()) {
|
|
|
|
Log(LogWarning, "ApiListener")
|
|
|
|
<< "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
|
|
|
|
|
|
|
|
Host::Ptr host = new Host();
|
|
|
|
Dictionary::Ptr attrs = new Dictionary();
|
|
|
|
|
|
|
|
attrs->Set("__name", params->Get("host"));
|
|
|
|
attrs->Set("type", "Host");
|
|
|
|
attrs->Set("enable_active_checks", false);
|
|
|
|
|
|
|
|
Deserialize(host, attrs, false, FAConfig);
|
|
|
|
|
|
|
|
if (params->Contains("service"))
|
|
|
|
host->SetExtension("agent_service_name", params->Get("service"));
|
|
|
|
|
|
|
|
CheckResult::Ptr cr = new CheckResult();
|
|
|
|
cr->SetState(ServiceUnknown);
|
|
|
|
cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.");
|
|
|
|
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
|
|
|
|
listener->SyncSendMessage(sourceEndpoint, message);
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* use a virtual host object for executing the command */
|
|
|
|
Host::Ptr host = new Host();
|
|
|
|
Dictionary::Ptr attrs = new Dictionary();
|
|
|
|
|
|
|
|
attrs->Set("__name", params->Get("host"));
|
|
|
|
attrs->Set("type", "Host");
|
|
|
|
|
2020-02-27 11:46:52 +01:00
|
|
|
/*
|
|
|
|
* Override the check timeout if the parent caller provided the value. Compatible with older versions not
|
|
|
|
* passing this inside the cluster message.
|
|
|
|
* This happens with host/service command_endpoint agents and the 'check_timeout' attribute being specified.
|
|
|
|
*/
|
|
|
|
if (params->Contains("check_timeout"))
|
|
|
|
attrs->Set("check_timeout", params->Get("check_timeout"));
|
|
|
|
|
2018-01-16 10:40:08 +01:00
|
|
|
Deserialize(host, attrs, false, FAConfig);
|
|
|
|
|
|
|
|
if (params->Contains("service"))
|
|
|
|
host->SetExtension("agent_service_name", params->Get("service"));
|
|
|
|
|
|
|
|
String command = params->Get("command");
|
|
|
|
String command_type = params->Get("command_type");
|
|
|
|
|
|
|
|
if (command_type == "check_command") {
|
|
|
|
if (!CheckCommand::GetByName(command)) {
|
|
|
|
CheckResult::Ptr cr = new CheckResult();
|
|
|
|
cr->SetState(ServiceUnknown);
|
|
|
|
cr->SetOutput("Check command '" + command + "' does not exist.");
|
|
|
|
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
|
|
|
|
listener->SyncSendMessage(sourceEndpoint, message);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else if (command_type == "event_command") {
|
|
|
|
if (!EventCommand::GetByName(command)) {
|
|
|
|
Log(LogWarning, "ClusterEvents")
|
|
|
|
<< "Event command '" << command << "' does not exist.";
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else
|
|
|
|
return;
|
|
|
|
|
|
|
|
attrs->Set(command_type, params->Get("command"));
|
|
|
|
attrs->Set("command_endpoint", sourceEndpoint->GetName());
|
|
|
|
|
|
|
|
Deserialize(host, attrs, false, FAConfig);
|
|
|
|
|
|
|
|
host->SetExtension("agent_check", true);
|
|
|
|
|
|
|
|
Dictionary::Ptr macros = params->Get("macros");
|
|
|
|
|
|
|
|
if (command_type == "check_command") {
|
|
|
|
try {
|
2020-07-13 10:14:30 +02:00
|
|
|
host->ExecuteRemoteCheck(macros);
|
2018-01-16 10:40:08 +01:00
|
|
|
} catch (const std::exception& ex) {
|
|
|
|
CheckResult::Ptr cr = new CheckResult();
|
|
|
|
cr->SetState(ServiceUnknown);
|
|
|
|
|
2018-07-21 10:38:09 +02:00
|
|
|
String output = "Exception occurred while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
|
2018-01-16 10:40:08 +01:00
|
|
|
cr->SetOutput(output);
|
|
|
|
|
|
|
|
double now = Utility::GetTime();
|
|
|
|
cr->SetScheduleStart(now);
|
|
|
|
cr->SetScheduleEnd(now);
|
|
|
|
cr->SetExecutionStart(now);
|
|
|
|
cr->SetExecutionEnd(now);
|
|
|
|
|
|
|
|
Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
|
|
|
|
listener->SyncSendMessage(sourceEndpoint, message);
|
|
|
|
|
|
|
|
Log(LogCritical, "checker", output);
|
|
|
|
}
|
|
|
|
} else if (command_type == "event_command") {
|
|
|
|
host->ExecuteEventHandler(macros, true);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-18 15:22:16 +01:00
|
|
|
int ClusterEvents::GetCheckRequestQueueSize()
|
|
|
|
{
|
|
|
|
return m_CheckRequestQueue.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
void ClusterEvents::LogRemoteCheckQueueInformation() {
|
|
|
|
if (m_ChecksDroppedDuringInterval > 0) {
|
|
|
|
Log(LogCritical, "ClusterEvents")
|
|
|
|
<< "Remote check queue ran out of slots. "
|
|
|
|
<< m_ChecksDroppedDuringInterval << " checks dropped.";
|
|
|
|
m_ChecksDroppedDuringInterval = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (m_ChecksExecutedDuringInterval == 0)
|
|
|
|
return;
|
|
|
|
|
|
|
|
Log(LogInformation, "RemoteCheckQueue")
|
|
|
|
<< "items: " << m_CheckRequestQueue.size()
|
|
|
|
<< ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
|
|
|
|
<< "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
|
|
|
|
<< m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
|
|
|
|
<< m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
|
|
|
|
|
|
|
|
m_ChecksExecutedDuringInterval = 0;
|
2019-04-15 16:56:30 +02:00
|
|
|
}
|