From bfc5d2a4bed35bb7256239e342dfef27df4b8978 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 12 Mar 2014 10:05:36 +0100 Subject: [PATCH] Use fewer threads for plugin-based checks. Fixes #5748 --- components/compat/checkresultreader.cpp | 5 +- lib/base/process-unix.cpp | 177 ++++++++++++++++++------ lib/base/process.h | 13 +- lib/icinga/checkcommand.cpp | 5 +- lib/icinga/checkcommand.h | 2 +- lib/icinga/externalcommandprocessor.cpp | 10 +- lib/icinga/pluginutility.cpp | 9 +- lib/icinga/pluginutility.h | 2 +- lib/icinga/service-check.cpp | 52 ++----- lib/methods/icingachecktask.cpp | 5 +- lib/methods/icingachecktask.h | 2 +- lib/methods/nullchecktask.cpp | 5 +- lib/methods/nullchecktask.h | 2 +- lib/methods/pluginchecktask.cpp | 32 +++-- lib/methods/pluginchecktask.h | 6 +- lib/methods/pluginnotificationtask.cpp | 7 +- lib/methods/pluginnotificationtask.h | 4 + 17 files changed, 218 insertions(+), 120 deletions(-) diff --git a/components/compat/checkresultreader.cpp b/components/compat/checkresultreader.cpp index 931228751..512190208 100644 --- a/components/compat/checkresultreader.cpp +++ b/components/compat/checkresultreader.cpp @@ -133,7 +133,10 @@ void CheckResultReader::ProcessCheckResultFile(const String& path) const return; } - CheckResult::Ptr result = PluginUtility::ParseCheckOutput(attrs["output"]); + CheckResult::Ptr result = make_shared(); + std::pair co = PluginUtility::ParseCheckOutput(attrs["output"]); + result->SetOutput(co.first); + result->SetPerformanceData(co.second); result->SetState(PluginUtility::ExitStatusToState(Convert::ToLong(attrs["return_code"]))); result->SetExecutionStart(Convert::ToDouble(attrs["start_time"])); result->SetExecutionEnd(Convert::ToDouble(attrs["finish_time"])); diff --git a/lib/base/process-unix.cpp b/lib/base/process-unix.cpp index 15e7f6837..f2a3b209f 100644 --- a/lib/base/process-unix.cpp +++ b/lib/base/process-unix.cpp @@ -21,6 +21,7 @@ #include "base/exception.h" #include "base/convert.h" #include "base/objectlock.h" +#include "base/initialize.h" #include "base/logger_fwd.h" #include "base/utility.h" #include "base/scriptvariable.h" @@ -43,11 +44,104 @@ extern char **environ; #define environ (*_NSGetEnviron()) #endif /* __APPLE__ */ -ProcessResult Process::Run(void) -{ - ProcessResult result; +static boost::mutex l_ProcessMutex; +static std::map l_Processes; +static int l_EventFDs[2]; - result.ExecutionStart = Utility::GetTime(); +INITIALIZE_ONCE(&Process::StaticInitialize); + +void Process::StaticInitialize(void) +{ +#ifdef HAVE_PIPE2 + if (pipe2(l_EventFDs, O_CLOEXEC) < 0) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("pipe2") + << boost::errinfo_errno(errno)); + } +#else /* HAVE_PIPE2 */ + if (pipe(l_EventFDs) < 0) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("pipe") + << boost::errinfo_errno(errno)); + } + + Utility::SetCloExec(fds[0]); + Utility::SetCloExec(fds[1]); +#endif /* HAVE_PIPE2 */ + + Utility::SetNonBlocking(l_EventFDs[0]); + Utility::SetNonBlocking(l_EventFDs[1]); + + boost::thread t(&Process::IOThreadProc); + t.detach(); +} + +void Process::IOThreadProc(void) +{ + pollfd *pfds = NULL; + int count = 0; + + for (;;) { + double timeout = 1; + + { + boost::mutex::scoped_lock lock(l_ProcessMutex); + + count = 1 + l_Processes.size(); + pfds = reinterpret_cast(realloc(pfds, sizeof(pollfd) * count)); + + pfds[0].fd = l_EventFDs[0]; + pfds[0].events = POLLIN; + pfds[0].revents = 0; + + int i = 1; + std::pair kv; + BOOST_FOREACH(kv, l_Processes) { + pfds[i].fd = kv.second->m_FD; + pfds[i].events = POLLIN; + pfds[i].revents = 0; + + if (kv.second->GetTimeout() != 0 && kv.second->GetTimeout() < timeout) + timeout = kv.second->GetTimeout(); + + i++; + } + } + + int rc = poll(pfds, count, timeout * 1000); + + if (rc < 0) + continue; + + { + boost::mutex::scoped_lock lock(l_ProcessMutex); + + if (pfds[0].revents & (POLLIN|POLLHUP|POLLERR)) { + char buffer[512]; + (void) read(l_EventFDs[0], buffer, sizeof(buffer)); + } + + for (int i = 1; i < count; i++) { + if (pfds[i].revents & (POLLIN|POLLHUP|POLLERR)) { + std::map::iterator it; + it = l_Processes.find(pfds[i].fd); + + if (it == l_Processes.end()) + continue; /* This should never happen. */ + + if (!it->second->DoEvents()) { + (void) close(it->first); + l_Processes.erase(it); + } + } + } + } + } +} + +void Process::Run(const boost::function& callback) +{ + m_Result.ExecutionStart = Utility::GetTime(); int fds[2]; @@ -159,51 +253,51 @@ ProcessResult Process::Run(void) delete [] envp; - int fd = fds[0]; (void) close(fds[1]); - char buffer[512]; + Utility::SetNonBlocking(fds[0]); - std::ostringstream outputStream; + m_FD = fds[0]; + m_Callback = callback; - pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; + { + boost::mutex::scoped_lock lock(l_ProcessMutex); + l_Processes[m_FD] = GetSelf(); + } - for (;;) { - int rc1, timeout; + (void) write(l_EventFDs[1], "T", 1); +} - timeout = 0; +bool Process::DoEvents(void) +{ + if (m_Timeout != 0) { + double timeout = m_Timeout - (Utility::GetTime() - m_Result.ExecutionStart); - if (m_Timeout != 0) { - timeout = m_Timeout - (Utility::GetTime() - result.ExecutionStart); - - if (timeout < 0) { - outputStream << ""; - kill(m_Pid, SIGKILL); - break; - } - } - - rc1 = poll(&pfd, 1, timeout * 1000); - - if (rc1 > 0) { - int rc2 = read(fd, buffer, sizeof(buffer)); - - if (rc2 <= 0) - break; - - outputStream.write(buffer, rc2); + if (timeout < 0) { + m_OutputStream << ""; + kill(m_Pid, SIGKILL); } } - String output = outputStream.str(); + char buffer[512]; + for (;;) { + int rc = read(m_FD, buffer, sizeof(buffer)); + + if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return true; + + if (rc > 0) { + m_OutputStream.write(buffer, rc); + return true; + } + + break; + } + + String output = m_OutputStream.str(); int status, exitcode; - (void) close(fd); - if (waitpid(m_Pid, &status, 0) != m_Pid) { BOOST_THROW_EXCEPTION(posix_error() << boost::errinfo_api_function("waitpid") @@ -221,11 +315,14 @@ ProcessResult Process::Run(void) exitcode = 128; } - result.ExecutionEnd = Utility::GetTime(); - result.ExitStatus = exitcode; - result.Output = output; + m_Result.ExecutionEnd = Utility::GetTime(); + m_Result.ExitStatus = exitcode; + m_Result.Output = output; - return result; + if (m_Callback) + m_Callback(m_Result); + + return false; } #endif /* _WIN32 */ diff --git a/lib/base/process.h b/lib/base/process.h index fb88dd17b..65ccff4e7 100644 --- a/lib/base/process.h +++ b/lib/base/process.h @@ -63,9 +63,14 @@ public: void SetTimeout(double timeout); double GetTimeout(void) const; - ProcessResult Run(void); + void Run(const boost::function& callback = boost::function()); static std::vector SplitCommand(const Value& command); + +#ifndef _WIN32 + static void StaticInitialize(void); +#endif /* _WIN32 */ + private: std::vector m_Arguments; Dictionary::Ptr m_ExtraEnvironment; @@ -74,7 +79,13 @@ private: #ifndef _WIN32 pid_t m_Pid; + int m_FD; + std::ostringstream m_OutputStream; + boost::function m_Callback; + ProcessResult m_Result; + static void IOThreadProc(void); + bool DoEvents(void); #endif /* _WIN32 */ }; diff --git a/lib/icinga/checkcommand.cpp b/lib/icinga/checkcommand.cpp index fc72014fa..6017a6951 100644 --- a/lib/icinga/checkcommand.cpp +++ b/lib/icinga/checkcommand.cpp @@ -24,9 +24,10 @@ using namespace icinga; REGISTER_TYPE(CheckCommand); -CheckResult::Ptr CheckCommand::Execute(const Service::Ptr& service) +void CheckCommand::Execute(const Service::Ptr& service, const CheckResult::Ptr& cr) { std::vector arguments; arguments.push_back(service); - return InvokeMethod("execute", arguments); + arguments.push_back(cr); + InvokeMethod("execute", arguments); } diff --git a/lib/icinga/checkcommand.h b/lib/icinga/checkcommand.h index 435eb056e..ec98eacc4 100644 --- a/lib/icinga/checkcommand.h +++ b/lib/icinga/checkcommand.h @@ -37,7 +37,7 @@ public: DECLARE_PTR_TYPEDEFS(CheckCommand); DECLARE_TYPENAME(CheckCommand); - virtual CheckResult::Ptr Execute(const Service::Ptr& service); + virtual void Execute(const Service::Ptr& service, const CheckResult::Ptr& cr); }; } diff --git a/lib/icinga/externalcommandprocessor.cpp b/lib/icinga/externalcommandprocessor.cpp index 00a43cfae..3243d4b29 100644 --- a/lib/icinga/externalcommandprocessor.cpp +++ b/lib/icinga/externalcommandprocessor.cpp @@ -234,7 +234,10 @@ void ExternalCommandProcessor::ProcessHostCheckResult(double time, const std::ve BOOST_THROW_EXCEPTION(std::invalid_argument("Got passive check result for host '" + arguments[0] + "' which has passive checks disabled.")); int exitStatus = Convert::ToDouble(arguments[1]); - CheckResult::Ptr result = PluginUtility::ParseCheckOutput(arguments[2]); + CheckResult::Ptr result = make_shared(); + std::pair co = PluginUtility::ParseCheckOutput(arguments[2]); + result->SetOutput(co.first); + result->SetPerformanceData(co.second); ServiceState state; @@ -281,7 +284,10 @@ void ExternalCommandProcessor::ProcessServiceCheckResult(double time, const std: BOOST_THROW_EXCEPTION(std::invalid_argument("Got passive check result for service '" + arguments[1] + "' which has passive checks disabled.")); int exitStatus = Convert::ToDouble(arguments[2]); - CheckResult::Ptr result = PluginUtility::ParseCheckOutput(arguments[3]); + CheckResult::Ptr result = make_shared(); + std::pair co = PluginUtility::ParseCheckOutput(arguments[3]); + result->SetOutput(co.first); + result->SetPerformanceData(co.second); result->SetState(PluginUtility::ExitStatusToState(exitStatus)); result->SetScheduleStart(time); diff --git a/lib/icinga/pluginutility.cpp b/lib/icinga/pluginutility.cpp index 9adebc291..afd232963 100644 --- a/lib/icinga/pluginutility.cpp +++ b/lib/icinga/pluginutility.cpp @@ -49,10 +49,8 @@ ServiceState PluginUtility::ExitStatusToState(int exitStatus) } } -CheckResult::Ptr PluginUtility::ParseCheckOutput(const String& output) +std::pair PluginUtility::ParseCheckOutput(const String& output) { - CheckResult::Ptr result = make_shared(); - String text; String perfdata; @@ -79,10 +77,7 @@ CheckResult::Ptr PluginUtility::ParseCheckOutput(const String& output) boost::algorithm::trim(perfdata); - result->SetOutput(text); - result->SetPerformanceData(ParsePerfdata(perfdata)); - - return result; + return std::make_pair(text, ParsePerfdata(perfdata)); } Value PluginUtility::ParsePerfdata(const String& perfdata) diff --git a/lib/icinga/pluginutility.h b/lib/icinga/pluginutility.h index a2a190861..3b246ff1f 100644 --- a/lib/icinga/pluginutility.h +++ b/lib/icinga/pluginutility.h @@ -39,7 +39,7 @@ class I2_ICINGA_API PluginUtility { public: static ServiceState ExitStatusToState(int exitStatus); - static CheckResult::Ptr ParseCheckOutput(const String& output); + static std::pair ParseCheckOutput(const String& output); static Value ParsePerfdata(const String& perfdata); static String FormatPerfdata(const Value& perfdata); diff --git a/lib/icinga/service-check.cpp b/lib/icinga/service-check.cpp index 1130fbbe9..4a2ab06f3 100644 --- a/lib/icinga/service-check.cpp +++ b/lib/icinga/service-check.cpp @@ -220,6 +220,11 @@ void Service::SetMaxCheckAttempts(int attempts) void Service::ProcessCheckResult(const CheckResult::Ptr& cr, const String& authority) { + { + ObjectLock olock(this); + m_CheckRunning = false; + } + double now = Utility::GetTime(); if (cr->GetScheduleStart() == 0) @@ -500,55 +505,20 @@ void Service::ExecuteCheck(void) SetLastReachable(reachable); } + UpdateNextCheck(); + /* keep track of scheduling info in case the check type doesn't provide its own information */ double scheduled_start = GetNextCheck(); double before_check = Utility::GetTime(); Service::Ptr self = GetSelf(); - CheckResult::Ptr result; + CheckResult::Ptr result = make_shared(); - try { - result = GetCheckCommand()->Execute(GetSelf()); - } catch (const std::exception& ex) { - std::ostringstream msgbuf; - msgbuf << "Exception occured during check for service '" - << GetName() << "': " << DiagnosticInformation(ex); - String message = msgbuf.str(); + result->SetScheduleStart(scheduled_start); + result->SetExecutionStart(before_check); - Log(LogWarning, "icinga", message); - - result = make_shared(); - result->SetState(StateUnknown); - result->SetOutput(message); - } - - double after_check = Utility::GetTime(); - - if (result) { - if (result->GetScheduleStart() == 0) - result->SetScheduleStart(scheduled_start); - - if (result->GetScheduleEnd() == 0) - result->SetScheduleEnd(after_check); - - if (result->GetExecutionStart() == 0) - result->SetExecutionStart(before_check); - - if (result->GetExecutionEnd() == 0) - result->SetExecutionEnd(after_check); - } - - /* update next check before processing any result */ - UpdateNextCheck(); - - if (result) - ProcessCheckResult(result); - - { - ObjectLock olock(this); - m_CheckRunning = false; - } + Utility::QueueAsyncCallback(boost::bind(&CheckCommand::Execute, GetCheckCommand(), GetSelf(), result)); } void Service::UpdateStatistics(const CheckResult::Ptr& cr) diff --git a/lib/methods/icingachecktask.cpp b/lib/methods/icingachecktask.cpp index e3b5fc65c..c791db92e 100644 --- a/lib/methods/icingachecktask.cpp +++ b/lib/methods/icingachecktask.cpp @@ -31,7 +31,7 @@ using namespace icinga; REGISTER_SCRIPTFUNCTION(IcingaCheck, &IcingaCheckTask::ScriptFunc); -CheckResult::Ptr IcingaCheckTask::ScriptFunc(const Service::Ptr&) +void IcingaCheckTask::ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr) { double interval = Utility::GetTime() - Application::GetStartTime(); @@ -79,12 +79,11 @@ CheckResult::Ptr IcingaCheckTask::ScriptFunc(const Service::Ptr&) perfdata->Set("num_hosts_in_downtime", hs.hosts_in_downtime); perfdata->Set("num_hosts_acknowledged", hs.hosts_acknowledged); - CheckResult::Ptr cr = make_shared(); cr->SetOutput("Icinga 2 is running."); cr->SetPerformanceData(perfdata); cr->SetState(StateOK); cr->SetCheckSource(IcingaApplication::GetInstance()->GetNodeName()); - return cr; + service->ProcessCheckResult(cr); } diff --git a/lib/methods/icingachecktask.h b/lib/methods/icingachecktask.h index fbacb4229..38d6b037a 100644 --- a/lib/methods/icingachecktask.h +++ b/lib/methods/icingachecktask.h @@ -34,7 +34,7 @@ namespace icinga class I2_METHODS_API IcingaCheckTask { public: - static CheckResult::Ptr ScriptFunc(const Service::Ptr& service); + static void ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr); private: IcingaCheckTask(void); diff --git a/lib/methods/nullchecktask.cpp b/lib/methods/nullchecktask.cpp index 3f621e78f..f525fb719 100644 --- a/lib/methods/nullchecktask.cpp +++ b/lib/methods/nullchecktask.cpp @@ -30,7 +30,7 @@ using namespace icinga; REGISTER_SCRIPTFUNCTION(NullCheck, &NullCheckTask::ScriptFunc); -CheckResult::Ptr NullCheckTask::ScriptFunc(const Service::Ptr&) +void NullCheckTask::ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr) { String output = "Hello from "; output += Utility::GetHostName(); @@ -38,11 +38,10 @@ CheckResult::Ptr NullCheckTask::ScriptFunc(const Service::Ptr&) Dictionary::Ptr perfdata = make_shared(); perfdata->Set("time", Convert::ToDouble(Utility::GetTime())); - CheckResult::Ptr cr = make_shared(); cr->SetOutput(output); cr->SetPerformanceData(perfdata); cr->SetState(StateOK); - return cr; + service->ProcessCheckResult(cr); } diff --git a/lib/methods/nullchecktask.h b/lib/methods/nullchecktask.h index 932380ba7..cc76a806b 100644 --- a/lib/methods/nullchecktask.h +++ b/lib/methods/nullchecktask.h @@ -35,7 +35,7 @@ namespace icinga class I2_METHODS_API NullCheckTask { public: - static CheckResult::Ptr ScriptFunc(const Service::Ptr& service); + static void ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr); private: NullCheckTask(void); diff --git a/lib/methods/pluginchecktask.cpp b/lib/methods/pluginchecktask.cpp index 3cd2077bd..6fea518f1 100644 --- a/lib/methods/pluginchecktask.cpp +++ b/lib/methods/pluginchecktask.cpp @@ -35,7 +35,7 @@ using namespace icinga; REGISTER_SCRIPTFUNCTION(PluginCheck, &PluginCheckTask::ScriptFunc); -CheckResult::Ptr PluginCheckTask::ScriptFunc(const Service::Ptr& service) +void PluginCheckTask::ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr) { CheckCommand::Ptr commandObj = service->GetCheckCommand(); Value raw_command = commandObj->GetCommandLine(); @@ -65,22 +65,28 @@ CheckResult::Ptr PluginCheckTask::ScriptFunc(const Service::Ptr& service) } } + cr->SetCommand(command); + Process::Ptr process = make_shared(Process::SplitCommand(command), envMacros); process->SetTimeout(commandObj->GetTimeout()); - ProcessResult pr = process->Run(); + process->Run(boost::bind(&PluginCheckTask::ProcessFinishedHandler, service, cr, _1)); - String output = pr.Output; - output.Trim(); - CheckResult::Ptr result = PluginUtility::ParseCheckOutput(output); - result->SetCommand(command); - result->SetState(PluginUtility::ExitStatusToState(pr.ExitStatus)); - result->SetExitStatus(pr.ExitStatus); - result->SetExecutionStart(pr.ExecutionStart); - result->SetExecutionEnd(pr.ExecutionEnd); - result->SetCheckSource(IcingaApplication::GetInstance()->GetNodeName()); - - return result; } +void PluginCheckTask::ProcessFinishedHandler(const Service::Ptr& service, const CheckResult::Ptr& cr, const ProcessResult& pr) +{ + String output = pr.Output; + output.Trim(); + std::pair co = PluginUtility::ParseCheckOutput(output); + cr->SetOutput(co.first); + cr->SetPerformanceData(co.second); + cr->SetState(PluginUtility::ExitStatusToState(pr.ExitStatus)); + cr->SetExitStatus(pr.ExitStatus); + cr->SetExecutionStart(pr.ExecutionStart); + cr->SetExecutionEnd(pr.ExecutionEnd); + cr->SetCheckSource(IcingaApplication::GetInstance()->GetNodeName()); + + service->ProcessCheckResult(cr); +} diff --git a/lib/methods/pluginchecktask.h b/lib/methods/pluginchecktask.h index fa53670be..02a64f8ea 100644 --- a/lib/methods/pluginchecktask.h +++ b/lib/methods/pluginchecktask.h @@ -21,6 +21,7 @@ #define PLUGINCHECKTASK_H #include "methods/i2-methods.h" +#include "base/process.h" #include "icinga/service.h" namespace icinga @@ -34,10 +35,13 @@ namespace icinga class I2_METHODS_API PluginCheckTask { public: - static CheckResult::Ptr ScriptFunc(const Service::Ptr& service); + static void ScriptFunc(const Service::Ptr& service, const CheckResult::Ptr& cr); private: PluginCheckTask(void); + + static void ProcessFinishedHandler(const Service::Ptr& service, const CheckResult::Ptr& cr, const ProcessResult& pr); + }; } diff --git a/lib/methods/pluginnotificationtask.cpp b/lib/methods/pluginnotificationtask.cpp index 78ff7eee4..c39332a8f 100644 --- a/lib/methods/pluginnotificationtask.cpp +++ b/lib/methods/pluginnotificationtask.cpp @@ -82,8 +82,11 @@ void PluginNotificationTask::ScriptFunc(const Notification::Ptr& notification, c process->SetTimeout(commandObj->GetTimeout()); - ProcessResult pr = process->Run(); + process->Run(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, service, command, _1)); +} +void PluginNotificationTask::ProcessFinishedHandler(const Service::Ptr& service, const Value& command, const ProcessResult& pr) +{ if (pr.ExitStatus != 0) { std::ostringstream msgbuf; msgbuf << "Notification command '" << command << "' for service '" @@ -91,4 +94,4 @@ void PluginNotificationTask::ScriptFunc(const Notification::Ptr& notification, c << pr.ExitStatus << ", output: " << pr.Output; Log(LogWarning, "icinga", msgbuf.str()); } -} +} \ No newline at end of file diff --git a/lib/methods/pluginnotificationtask.h b/lib/methods/pluginnotificationtask.h index 2ecb2cbf4..f6cafbc56 100644 --- a/lib/methods/pluginnotificationtask.h +++ b/lib/methods/pluginnotificationtask.h @@ -22,6 +22,8 @@ #include "methods/i2-methods.h" #include "icinga/notification.h" +#include "icinga/service.h" +#include "base/process.h" namespace icinga { @@ -40,6 +42,8 @@ public: private: PluginNotificationTask(void); + + static void ProcessFinishedHandler(const Service::Ptr& service, const Value& command, const ProcessResult& pr); }; }