diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 6f36da913..b2b395f0f 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -3,6 +3,7 @@ #include "base/process.hpp" #include "base/exception.hpp" #include "base/convert.hpp" +#include "base/configuration.hpp" #include "base/array.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" @@ -11,8 +12,10 @@ #include "base/utility.hpp" #include "base/scriptglobal.hpp" #include "base/json.hpp" +#include #include #include +#include #include #include @@ -32,6 +35,21 @@ extern char **environ; using namespace icinga; #define IOTHREADS 4 +#define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this) / sizeof(void*) % l_ProcessControl.Len] + +struct Spawner +{ + boost::mutex Mutex; + int FD = -1; + pid_t PID = -1; + + void StartSpawnProcessHelper(); + void ProcessHandler(); + pid_t ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]); + int ProcessKill(pid_t pid, int signum); + int ProcessWaitPID(pid_t pid, int *status); + Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request); +}; static boost::mutex l_ProcessMutex[IOTHREADS]; static std::map l_Processes[IOTHREADS]; @@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS]; static int l_EventFDs[IOTHREADS][2]; static std::map l_FDs[IOTHREADS]; -static boost::mutex l_ProcessControlMutex; -static int l_ProcessControlFD = -1; -static pid_t l_ProcessControlPID; +static struct { + Spawner* Spawners = nullptr; + size_t Len = 0; +} l_ProcessControl; #endif /* _WIN32 */ static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT; static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT; @@ -67,7 +86,7 @@ Process::~Process() } #ifndef _WIN32 -static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request) +Value Spawner::ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request) { struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh); @@ -130,7 +149,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques if (pid == 0) { // child process - (void)close(l_ProcessControlFD); + (void)close(FD); if (setsid() < 0) { perror("setsid() failed"); @@ -226,7 +245,7 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ return response; } -static void ProcessHandler() +void Spawner::ProcessHandler() { sigset_t mask; sigfillset(&mask); @@ -240,7 +259,7 @@ static void ProcessHandler() maxfds = 65536; for (rlim_t i = 3; i < maxfds; i++) - if (i != static_cast(l_ProcessControlFD)) + if (i != static_cast(FD)) (void)close(i); } @@ -261,7 +280,7 @@ static void ProcessHandler() msg.msg_control = cbuf; msg.msg_controllen = sizeof(cbuf); - int rc = recvmsg(l_ProcessControlFD, &msg, 0); + int rc = recvmsg(FD, &msg, 0); if (rc <= 0) { if (rc < 0 && (errno == EINTR || errno == EAGAIN)) @@ -274,7 +293,7 @@ static void ProcessHandler() size_t count = 0; while (count < length) { - rc = recv(l_ProcessControlFD, mbuf + count, length - count, 0); + rc = recv(FD, mbuf + count, length - count, 0); if (rc <= 0) { if (rc < 0 && (errno == EINTR || errno == EAGAIN)) @@ -312,7 +331,7 @@ static void ProcessHandler() String jresponse = JsonEncode(response); - if (send(l_ProcessControlFD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) { + if (send(FD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) { BOOST_THROW_EXCEPTION(posix_error() << boost::errinfo_api_function("send") << boost::errinfo_errno(errno)); @@ -322,13 +341,13 @@ static void ProcessHandler() _exit(0); } -static void StartSpawnProcessHelper() +void Spawner::StartSpawnProcessHelper() { - if (l_ProcessControlFD != -1) { - (void)close(l_ProcessControlFD); + if (FD != -1) { + (void)close(FD); int status; - (void)waitpid(l_ProcessControlPID, &status, 0); + (void)waitpid(PID, &status, 0); } int controlFDs[2]; @@ -349,7 +368,7 @@ static void StartSpawnProcessHelper() if (pid == 0) { (void)close(controlFDs[1]); - l_ProcessControlFD = controlFDs[0]; + FD = controlFDs[0]; ProcessHandler(); @@ -358,11 +377,11 @@ static void StartSpawnProcessHelper() (void)close(controlFDs[0]); - l_ProcessControlFD = controlFDs[1]; - l_ProcessControlPID = pid; + FD = controlFDs[1]; + PID = pid; } -static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]) +pid_t Spawner::ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]) { Dictionary::Ptr request = new Dictionary({ { "command", "spawn" }, @@ -374,7 +393,7 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - boost::mutex::scoped_lock lock(l_ProcessControlMutex); + boost::mutex::scoped_lock lock(Mutex); struct msghdr msg; memset(&msg, 0, sizeof(msg)); @@ -400,14 +419,14 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary msg.msg_controllen = cmsg->cmsg_len; do { - while (sendmsg(l_ProcessControlFD, &msg, 0) < 0) { + while (sendmsg(FD, &msg, 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -422,7 +441,7 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary return response->Get("rc"); } -static int ProcessKill(pid_t pid, int signum) +int Spawner::ProcessKill(pid_t pid, int signum) { Dictionary::Ptr request = new Dictionary({ { "command", "kill" }, @@ -433,17 +452,17 @@ static int ProcessKill(pid_t pid, int signum) String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - boost::mutex::scoped_lock lock(l_ProcessControlMutex); + boost::mutex::scoped_lock lock(Mutex); do { - while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) { + while (send(FD, &length, sizeof(length), 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -454,7 +473,7 @@ static int ProcessKill(pid_t pid, int signum) return response->Get("errno"); } -static int ProcessWaitPID(pid_t pid, int *status) +int Spawner::ProcessWaitPID(pid_t pid, int *status) { Dictionary::Ptr request = new Dictionary({ { "command", "waitpid" }, @@ -464,17 +483,17 @@ static int ProcessWaitPID(pid_t pid, int *status) String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - boost::mutex::scoped_lock lock(l_ProcessControlMutex); + boost::mutex::scoped_lock lock(Mutex); do { - while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) { + while (send(FD, &length, sizeof(length), 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -488,8 +507,18 @@ static int ProcessWaitPID(pid_t pid, int *status) void Process::InitializeSpawnHelper() { - if (l_ProcessControlFD == -1) - StartSpawnProcessHelper(); + if (!l_ProcessControl.Spawners) { + auto len (std::max(1, Configuration::Concurrency)); + + l_ProcessControl.Spawners = new Spawner[len]; + l_ProcessControl.Len = len; + } + + for (Spawner *current = l_ProcessControl.Spawners, *stop = l_ProcessControl.Spawners + l_ProcessControl.Len; current < stop; ++current) { + if (current->FD == -1) { + current->StartSpawnProcessHelper(); + } + } } #endif /* _WIN32 */ @@ -969,7 +998,7 @@ void Process::Run(const std::function& callback) fds[1] = outfds[1]; fds[2] = outfds[1]; - m_Process = ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds); + m_Process = MySpawner.ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds); m_PID = m_Process; if (m_PID == -1) { @@ -1026,7 +1055,7 @@ bool Process::DoEvents() #ifdef _WIN32 TerminateProcess(m_Process, 3); #else /* _WIN32 */ - int error = ProcessKill(-m_Process, SIGKILL); + int error = MySpawner.ProcessKill(-m_Process, SIGKILL); if (error) { Log(LogWarning, "Process") << "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments) @@ -1080,7 +1109,7 @@ bool Process::DoEvents() int status, exitcode; if (could_not_kill || m_PID == -1) { exitcode = 128; - } else if (ProcessWaitPID(m_Process, &status) != m_Process) { + } else if (MySpawner.ProcessWaitPID(m_Process, &status) != m_Process) { exitcode = 128; Log(LogWarning, "Process")