diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 6ae7ea268..882fb77bd 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -39,22 +39,38 @@ Process::Process(const vector& arguments, const Dictionary::Ptr& extraEn assert(Application::IsMainThread()); if (!m_ThreadCreated) { - int childTaskFd; - -#ifdef _MSC_VER - childTaskFd = 0; -#else /* _MSC_VER */ +#ifndef _MSC_VER int fds[2]; if (pipe(fds) < 0) BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno)); - childTaskFd = fds[0]; m_TaskFd = fds[1]; #endif /* _MSC_VER */ - thread t(&Process::WorkerThreadProc, childTaskFd); - t.detach(); + for (int i = 0; i < thread::hardware_concurrency(); i++) { + int childTaskFd; + +#ifdef _MSC_VER + childTaskFd = 0; +#else /* _MSC_VER */ + childTaskFd = dup(fds[0]); + + if (childTaskFd < 0) + BOOST_THROW_EXCEPTION(PosixException("dup() failed.", errno)); + + int flags; + flags = fcntl(childTaskFd, F_GETFL, 0); + if (flags < 0) + BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno)); + + if (fcntl(childTaskFd, F_SETFL, flags | O_NONBLOCK) < 0) + BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno)); +#endif /* _MSC_VER */ + + thread t(&Process::WorkerThreadProc, childTaskFd); + t.detach(); + } m_ThreadCreated = true; } @@ -166,28 +182,25 @@ void Process::WorkerThreadProc(int taskFd) #ifndef _MSC_VER if (FD_ISSET(taskFd, &readfds)) { #endif /* _MSC_VER */ - /* Figure out how many tasks we'd ideally want. */ - int tasknum = MaxTasksPerThread - tasks.size(); -#ifndef _MSC_VER - /* Read one byte for every task we take from the pending tasks list. */ - char buffer[MaxTasksPerThread]; - tasknum = read(taskFd, &buffer, tasknum); - - if (tasknum < 0) - BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); - - assert(tasknum >= 1); -#endif /* _MSC_VER */ - - for (int i = 0; i < tasknum; i++) { + while (tasks.size() < MaxTasksPerThread) { Process::Ptr task; { boost::mutex::scoped_lock lock(m_Mutex); - if (m_Tasks.empty()) - break; + /* Read one byte for every task we take from the pending tasks list. */ + char buffer; + int rc = read(taskFd, &buffer, sizeof(buffer)); + + if (rc < 0) { + if (errno == EAGAIN) + break; /* Someone else was faster and took our task. */ + + BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); + } + + assert(!m_Tasks.empty()); task = m_Tasks.front(); m_Tasks.pop_front();