diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 3fd771f89..6ae7ea268 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -136,7 +136,9 @@ void Process::WorkerThreadProc(int taskFd) int nfds = 0; FD_ZERO(&readfds); - FD_SET(taskFd, &readfds); + + if (tasks.size() < MaxTasksPerThread) + FD_SET(taskFd, &readfds); if (taskFd > nfds) nfds = taskFd; @@ -149,23 +151,36 @@ void Process::WorkerThreadProc(int taskFd) FD_SET(fd, &readfds); } - timeval tv; - tv.tv_sec = 1; - tv.tv_usec = 0; - select(nfds + 1, &readfds, NULL, NULL, &tv); + int rc = select(nfds + 1, &readfds, NULL, NULL, NULL); + + if (rc < 0 && errno != EINTR) + BOOST_THROW_EXCEPTION(PosixException("select() failed.", errno)); + + if (rc == 0) + continue; + #else /* _MSC_VER */ Utility::Sleep(1); #endif /* _MSC_VER */ #ifndef _MSC_VER if (FD_ISSET(taskFd, &readfds)) { - /* clear pipe */ - char buffer[512]; - int rc = read(taskFd, buffer, sizeof(buffer)); - assert(rc >= 1); +#endif /* _MSC_VER */ + /* Figure out how many tasks we'd ideally want. */ + int tasknum = MaxTasksPerThread - tasks.size(); + +#ifndef _MSC_VER + /* Read one byte for every task we take from the pending tasks list. */ + char buffer[MaxTasksPerThread]; + tasknum = read(taskFd, &buffer, tasknum); + + if (tasknum < 0) + BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); + + assert(tasknum >= 1); #endif /* _MSC_VER */ - while (tasks.size() < MaxTasksPerThread) { + for (int i = 0; i < tasknum; i++) { Process::Ptr task; {