mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-25 18:48:50 +02:00
Spawn Configuration::Concurrency process managers
This commit is contained in:
parent
338d0aaa8c
commit
c53b9ae5ab
@ -3,6 +3,7 @@
|
|||||||
#include "base/process.hpp"
|
#include "base/process.hpp"
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
#include "base/convert.hpp"
|
#include "base/convert.hpp"
|
||||||
|
#include "base/configuration.hpp"
|
||||||
#include "base/array.hpp"
|
#include "base/array.hpp"
|
||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
#include "base/utility.hpp"
|
#include "base/utility.hpp"
|
||||||
@ -11,8 +12,10 @@
|
|||||||
#include "base/utility.hpp"
|
#include "base/utility.hpp"
|
||||||
#include "base/scriptglobal.hpp"
|
#include "base/scriptglobal.hpp"
|
||||||
#include "base/json.hpp"
|
#include "base/json.hpp"
|
||||||
|
#include <algorithm>
|
||||||
#include <boost/algorithm/string/join.hpp>
|
#include <boost/algorithm/string/join.hpp>
|
||||||
#include <boost/thread/once.hpp>
|
#include <boost/thread/once.hpp>
|
||||||
|
#include <cstddef>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
@ -32,6 +35,21 @@ extern char **environ;
|
|||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
#define IOTHREADS 4
|
#define IOTHREADS 4
|
||||||
|
#define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this) / sizeof(void*) % l_ProcessControl.Len]
|
||||||
|
|
||||||
|
struct Spawner
|
||||||
|
{
|
||||||
|
boost::mutex Mutex;
|
||||||
|
int FD = -1;
|
||||||
|
pid_t PID = -1;
|
||||||
|
|
||||||
|
void StartSpawnProcessHelper();
|
||||||
|
void ProcessHandler();
|
||||||
|
pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]);
|
||||||
|
int ProcessKill(pid_t pid, int signum);
|
||||||
|
int ProcessWaitPID(pid_t pid, int *status);
|
||||||
|
Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request);
|
||||||
|
};
|
||||||
|
|
||||||
static boost::mutex l_ProcessMutex[IOTHREADS];
|
static boost::mutex l_ProcessMutex[IOTHREADS];
|
||||||
static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];
|
static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];
|
||||||
@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS];
|
|||||||
static int l_EventFDs[IOTHREADS][2];
|
static int l_EventFDs[IOTHREADS][2];
|
||||||
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];
|
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];
|
||||||
|
|
||||||
static boost::mutex l_ProcessControlMutex;
|
static struct {
|
||||||
static int l_ProcessControlFD = -1;
|
Spawner* Spawners = nullptr;
|
||||||
static pid_t l_ProcessControlPID;
|
size_t Len = 0;
|
||||||
|
} l_ProcessControl;
|
||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
|
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
|
||||||
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
|
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
|
||||||
@ -67,7 +86,7 @@ Process::~Process()
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifndef _WIN32
|
#ifndef _WIN32
|
||||||
static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
|
Value Spawner::ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
|
||||||
{
|
{
|
||||||
struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh);
|
struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh);
|
||||||
|
|
||||||
@ -130,7 +149,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
|
|||||||
if (pid == 0) {
|
if (pid == 0) {
|
||||||
// child process
|
// child process
|
||||||
|
|
||||||
(void)close(l_ProcessControlFD);
|
(void)close(FD);
|
||||||
|
|
||||||
if (setsid() < 0) {
|
if (setsid() < 0) {
|
||||||
perror("setsid() failed");
|
perror("setsid() failed");
|
||||||
@ -226,7 +245,7 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ProcessHandler()
|
void Spawner::ProcessHandler()
|
||||||
{
|
{
|
||||||
sigset_t mask;
|
sigset_t mask;
|
||||||
sigfillset(&mask);
|
sigfillset(&mask);
|
||||||
@ -240,7 +259,7 @@ static void ProcessHandler()
|
|||||||
maxfds = 65536;
|
maxfds = 65536;
|
||||||
|
|
||||||
for (rlim_t i = 3; i < maxfds; i++)
|
for (rlim_t i = 3; i < maxfds; i++)
|
||||||
if (i != static_cast<rlim_t>(l_ProcessControlFD))
|
if (i != static_cast<rlim_t>(FD))
|
||||||
(void)close(i);
|
(void)close(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,7 +280,7 @@ static void ProcessHandler()
|
|||||||
msg.msg_control = cbuf;
|
msg.msg_control = cbuf;
|
||||||
msg.msg_controllen = sizeof(cbuf);
|
msg.msg_controllen = sizeof(cbuf);
|
||||||
|
|
||||||
int rc = recvmsg(l_ProcessControlFD, &msg, 0);
|
int rc = recvmsg(FD, &msg, 0);
|
||||||
|
|
||||||
if (rc <= 0) {
|
if (rc <= 0) {
|
||||||
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
|
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
|
||||||
@ -274,7 +293,7 @@ static void ProcessHandler()
|
|||||||
|
|
||||||
size_t count = 0;
|
size_t count = 0;
|
||||||
while (count < length) {
|
while (count < length) {
|
||||||
rc = recv(l_ProcessControlFD, mbuf + count, length - count, 0);
|
rc = recv(FD, mbuf + count, length - count, 0);
|
||||||
|
|
||||||
if (rc <= 0) {
|
if (rc <= 0) {
|
||||||
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
|
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
|
||||||
@ -312,7 +331,7 @@ static void ProcessHandler()
|
|||||||
|
|
||||||
String jresponse = JsonEncode(response);
|
String jresponse = JsonEncode(response);
|
||||||
|
|
||||||
if (send(l_ProcessControlFD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
|
if (send(FD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
|
||||||
BOOST_THROW_EXCEPTION(posix_error()
|
BOOST_THROW_EXCEPTION(posix_error()
|
||||||
<< boost::errinfo_api_function("send")
|
<< boost::errinfo_api_function("send")
|
||||||
<< boost::errinfo_errno(errno));
|
<< boost::errinfo_errno(errno));
|
||||||
@ -322,13 +341,13 @@ static void ProcessHandler()
|
|||||||
_exit(0);
|
_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void StartSpawnProcessHelper()
|
void Spawner::StartSpawnProcessHelper()
|
||||||
{
|
{
|
||||||
if (l_ProcessControlFD != -1) {
|
if (FD != -1) {
|
||||||
(void)close(l_ProcessControlFD);
|
(void)close(FD);
|
||||||
|
|
||||||
int status;
|
int status;
|
||||||
(void)waitpid(l_ProcessControlPID, &status, 0);
|
(void)waitpid(PID, &status, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int controlFDs[2];
|
int controlFDs[2];
|
||||||
@ -349,7 +368,7 @@ static void StartSpawnProcessHelper()
|
|||||||
if (pid == 0) {
|
if (pid == 0) {
|
||||||
(void)close(controlFDs[1]);
|
(void)close(controlFDs[1]);
|
||||||
|
|
||||||
l_ProcessControlFD = controlFDs[0];
|
FD = controlFDs[0];
|
||||||
|
|
||||||
ProcessHandler();
|
ProcessHandler();
|
||||||
|
|
||||||
@ -358,11 +377,11 @@ static void StartSpawnProcessHelper()
|
|||||||
|
|
||||||
(void)close(controlFDs[0]);
|
(void)close(controlFDs[0]);
|
||||||
|
|
||||||
l_ProcessControlFD = controlFDs[1];
|
FD = controlFDs[1];
|
||||||
l_ProcessControlPID = pid;
|
PID = pid;
|
||||||
}
|
}
|
||||||
|
|
||||||
static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
|
pid_t Spawner::ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
|
||||||
{
|
{
|
||||||
Dictionary::Ptr request = new Dictionary({
|
Dictionary::Ptr request = new Dictionary({
|
||||||
{ "command", "spawn" },
|
{ "command", "spawn" },
|
||||||
@ -374,7 +393,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
|
|||||||
String jrequest = JsonEncode(request);
|
String jrequest = JsonEncode(request);
|
||||||
size_t length = jrequest.GetLength();
|
size_t length = jrequest.GetLength();
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(l_ProcessControlMutex);
|
boost::mutex::scoped_lock lock(Mutex);
|
||||||
|
|
||||||
struct msghdr msg;
|
struct msghdr msg;
|
||||||
memset(&msg, 0, sizeof(msg));
|
memset(&msg, 0, sizeof(msg));
|
||||||
@ -400,14 +419,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
|
|||||||
msg.msg_controllen = cmsg->cmsg_len;
|
msg.msg_controllen = cmsg->cmsg_len;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
while (sendmsg(l_ProcessControlFD, &msg, 0) < 0) {
|
while (sendmsg(FD, &msg, 0) < 0) {
|
||||||
StartSpawnProcessHelper();
|
StartSpawnProcessHelper();
|
||||||
}
|
}
|
||||||
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
||||||
|
|
||||||
char buf[4096];
|
char buf[4096];
|
||||||
|
|
||||||
ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
|
ssize_t rc = recv(FD, buf, sizeof(buf), 0);
|
||||||
|
|
||||||
if (rc <= 0)
|
if (rc <= 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -422,7 +441,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
|
|||||||
return response->Get("rc");
|
return response->Get("rc");
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ProcessKill(pid_t pid, int signum)
|
int Spawner::ProcessKill(pid_t pid, int signum)
|
||||||
{
|
{
|
||||||
Dictionary::Ptr request = new Dictionary({
|
Dictionary::Ptr request = new Dictionary({
|
||||||
{ "command", "kill" },
|
{ "command", "kill" },
|
||||||
@ -433,17 +452,17 @@ static int ProcessKill(pid_t pid, int signum)
|
|||||||
String jrequest = JsonEncode(request);
|
String jrequest = JsonEncode(request);
|
||||||
size_t length = jrequest.GetLength();
|
size_t length = jrequest.GetLength();
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(l_ProcessControlMutex);
|
boost::mutex::scoped_lock lock(Mutex);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
|
while (send(FD, &length, sizeof(length), 0) < 0) {
|
||||||
StartSpawnProcessHelper();
|
StartSpawnProcessHelper();
|
||||||
}
|
}
|
||||||
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
||||||
|
|
||||||
char buf[4096];
|
char buf[4096];
|
||||||
|
|
||||||
ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
|
ssize_t rc = recv(FD, buf, sizeof(buf), 0);
|
||||||
|
|
||||||
if (rc <= 0)
|
if (rc <= 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -454,7 +473,7 @@ static int ProcessKill(pid_t pid, int signum)
|
|||||||
return response->Get("errno");
|
return response->Get("errno");
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ProcessWaitPID(pid_t pid, int *status)
|
int Spawner::ProcessWaitPID(pid_t pid, int *status)
|
||||||
{
|
{
|
||||||
Dictionary::Ptr request = new Dictionary({
|
Dictionary::Ptr request = new Dictionary({
|
||||||
{ "command", "waitpid" },
|
{ "command", "waitpid" },
|
||||||
@ -464,17 +483,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
|
|||||||
String jrequest = JsonEncode(request);
|
String jrequest = JsonEncode(request);
|
||||||
size_t length = jrequest.GetLength();
|
size_t length = jrequest.GetLength();
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(l_ProcessControlMutex);
|
boost::mutex::scoped_lock lock(Mutex);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
|
while (send(FD, &length, sizeof(length), 0) < 0) {
|
||||||
StartSpawnProcessHelper();
|
StartSpawnProcessHelper();
|
||||||
}
|
}
|
||||||
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
|
||||||
|
|
||||||
char buf[4096];
|
char buf[4096];
|
||||||
|
|
||||||
ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
|
ssize_t rc = recv(FD, buf, sizeof(buf), 0);
|
||||||
|
|
||||||
if (rc <= 0)
|
if (rc <= 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -488,8 +507,18 @@ static int ProcessWaitPID(pid_t pid, int *status)
|
|||||||
|
|
||||||
void Process::InitializeSpawnHelper()
|
void Process::InitializeSpawnHelper()
|
||||||
{
|
{
|
||||||
if (l_ProcessControlFD == -1)
|
if (!l_ProcessControl.Spawners) {
|
||||||
StartSpawnProcessHelper();
|
auto len (std::max(1, Configuration::Concurrency));
|
||||||
|
|
||||||
|
l_ProcessControl.Spawners = new Spawner[len];
|
||||||
|
l_ProcessControl.Len = len;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Spawner *current = l_ProcessControl.Spawners, *stop = l_ProcessControl.Spawners + l_ProcessControl.Len; current < stop; ++current) {
|
||||||
|
if (current->FD == -1) {
|
||||||
|
current->StartSpawnProcessHelper();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
|
|
||||||
@ -969,7 +998,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
|
|||||||
fds[1] = outfds[1];
|
fds[1] = outfds[1];
|
||||||
fds[2] = outfds[1];
|
fds[2] = outfds[1];
|
||||||
|
|
||||||
m_Process = ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
|
m_Process = MySpawner.ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
|
||||||
m_PID = m_Process;
|
m_PID = m_Process;
|
||||||
|
|
||||||
if (m_PID == -1) {
|
if (m_PID == -1) {
|
||||||
@ -1026,7 +1055,7 @@ bool Process::DoEvents()
|
|||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
TerminateProcess(m_Process, 3);
|
TerminateProcess(m_Process, 3);
|
||||||
#else /* _WIN32 */
|
#else /* _WIN32 */
|
||||||
int error = ProcessKill(-m_Process, SIGKILL);
|
int error = MySpawner.ProcessKill(-m_Process, SIGKILL);
|
||||||
if (error) {
|
if (error) {
|
||||||
Log(LogWarning, "Process")
|
Log(LogWarning, "Process")
|
||||||
<< "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
|
<< "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
|
||||||
@ -1080,7 +1109,7 @@ bool Process::DoEvents()
|
|||||||
int status, exitcode;
|
int status, exitcode;
|
||||||
if (could_not_kill || m_PID == -1) {
|
if (could_not_kill || m_PID == -1) {
|
||||||
exitcode = 128;
|
exitcode = 128;
|
||||||
} else if (ProcessWaitPID(m_Process, &status) != m_Process) {
|
} else if (MySpawner.ProcessWaitPID(m_Process, &status) != m_Process) {
|
||||||
exitcode = 128;
|
exitcode = 128;
|
||||||
|
|
||||||
Log(LogWarning, "Process")
|
Log(LogWarning, "Process")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user