mirror of https://github.com/Icinga/icinga2.git
Use multiple worker threads for Process tasks.
This commit is contained in:
parent
aa2322abbb
commit
7ce98ed374
|
@ -39,22 +39,38 @@ Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEn
|
||||||
assert(Application::IsMainThread());
|
assert(Application::IsMainThread());
|
||||||
|
|
||||||
if (!m_ThreadCreated) {
|
if (!m_ThreadCreated) {
|
||||||
int childTaskFd;
|
#ifndef _MSC_VER
|
||||||
|
|
||||||
#ifdef _MSC_VER
|
|
||||||
childTaskFd = 0;
|
|
||||||
#else /* _MSC_VER */
|
|
||||||
int fds[2];
|
int fds[2];
|
||||||
|
|
||||||
if (pipe(fds) < 0)
|
if (pipe(fds) < 0)
|
||||||
BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno));
|
BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno));
|
||||||
|
|
||||||
childTaskFd = fds[0];
|
|
||||||
m_TaskFd = fds[1];
|
m_TaskFd = fds[1];
|
||||||
#endif /* _MSC_VER */
|
#endif /* _MSC_VER */
|
||||||
|
|
||||||
|
for (int i = 0; i < thread::hardware_concurrency(); i++) {
|
||||||
|
int childTaskFd;
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
childTaskFd = 0;
|
||||||
|
#else /* _MSC_VER */
|
||||||
|
childTaskFd = dup(fds[0]);
|
||||||
|
|
||||||
|
if (childTaskFd < 0)
|
||||||
|
BOOST_THROW_EXCEPTION(PosixException("dup() failed.", errno));
|
||||||
|
|
||||||
|
int flags;
|
||||||
|
flags = fcntl(childTaskFd, F_GETFL, 0);
|
||||||
|
if (flags < 0)
|
||||||
|
BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno));
|
||||||
|
|
||||||
|
if (fcntl(childTaskFd, F_SETFL, flags | O_NONBLOCK) < 0)
|
||||||
|
BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno));
|
||||||
|
#endif /* _MSC_VER */
|
||||||
|
|
||||||
thread t(&Process::WorkerThreadProc, childTaskFd);
|
thread t(&Process::WorkerThreadProc, childTaskFd);
|
||||||
t.detach();
|
t.detach();
|
||||||
|
}
|
||||||
|
|
||||||
m_ThreadCreated = true;
|
m_ThreadCreated = true;
|
||||||
}
|
}
|
||||||
|
@ -166,28 +182,25 @@ void Process::WorkerThreadProc(int taskFd)
|
||||||
#ifndef _MSC_VER
|
#ifndef _MSC_VER
|
||||||
if (FD_ISSET(taskFd, &readfds)) {
|
if (FD_ISSET(taskFd, &readfds)) {
|
||||||
#endif /* _MSC_VER */
|
#endif /* _MSC_VER */
|
||||||
/* Figure out how many tasks we'd ideally want. */
|
|
||||||
int tasknum = MaxTasksPerThread - tasks.size();
|
|
||||||
|
|
||||||
#ifndef _MSC_VER
|
while (tasks.size() < MaxTasksPerThread) {
|
||||||
/* Read one byte for every task we take from the pending tasks list. */
|
|
||||||
char buffer[MaxTasksPerThread];
|
|
||||||
tasknum = read(taskFd, &buffer, tasknum);
|
|
||||||
|
|
||||||
if (tasknum < 0)
|
|
||||||
BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
|
|
||||||
|
|
||||||
assert(tasknum >= 1);
|
|
||||||
#endif /* _MSC_VER */
|
|
||||||
|
|
||||||
for (int i = 0; i < tasknum; i++) {
|
|
||||||
Process::Ptr task;
|
Process::Ptr task;
|
||||||
|
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
boost::mutex::scoped_lock lock(m_Mutex);
|
||||||
|
|
||||||
if (m_Tasks.empty())
|
/* Read one byte for every task we take from the pending tasks list. */
|
||||||
break;
|
char buffer;
|
||||||
|
int rc = read(taskFd, &buffer, sizeof(buffer));
|
||||||
|
|
||||||
|
if (rc < 0) {
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
break; /* Someone else was faster and took our task. */
|
||||||
|
|
||||||
|
BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!m_Tasks.empty());
|
||||||
|
|
||||||
task = m_Tasks.front();
|
task = m_Tasks.front();
|
||||||
m_Tasks.pop_front();
|
m_Tasks.pop_front();
|
||||||
|
|
Loading…
Reference in New Issue