Implemented poll() support for Process tasks.

Fixes #3035
This commit is contained in:
Gunnar Beutner 2013-02-10 23:31:11 +01:00
parent 7ce98ed374
commit bddd9ebf0b
2 changed files with 75 additions and 55 deletions

View File

@ -143,34 +143,36 @@ void Process::Run(void)
void Process::WorkerThreadProc(int taskFd) void Process::WorkerThreadProc(int taskFd)
{ {
map<int, Process::Ptr> tasks; map<int, Process::Ptr> tasks;
pollfd *pfds;
for (;;) { for (;;) {
map<int, Process::Ptr>::iterator it, prev; map<int, Process::Ptr>::iterator it, prev;
#ifndef _MSC_VER #ifndef _MSC_VER
fd_set readfds; pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd));
int nfds = 0;
FD_ZERO(&readfds); if (pfds == NULL)
BOOST_THROW_EXCEPTION(PosixException("realloc() failed.", errno));
if (tasks.size() < MaxTasksPerThread) int idx = 0;
FD_SET(taskFd, &readfds);
if (taskFd > nfds) if (tasks.size() < MaxTasksPerThread) {
nfds = taskFd; pfds[idx].fd = taskFd;
pfds[idx].events = POLLIN;
idx++;
}
int fd; int fd;
BOOST_FOREACH(tie(fd, tuples::ignore), tasks) { BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
if (fd > nfds) pfds[idx].fd = fd;
nfds = fd; pfds[idx].events = POLLIN;
idx++;
FD_SET(fd, &readfds);
} }
int rc = select(nfds + 1, &readfds, NULL, NULL, NULL); int rc = poll(pfds, idx, -1);
if (rc < 0 && errno != EINTR) if (rc < 0 && errno != EINTR)
BOOST_THROW_EXCEPTION(PosixException("select() failed.", errno)); BOOST_THROW_EXCEPTION(PosixException("poll() failed.", errno));
if (rc == 0) if (rc == 0)
continue; continue;
@ -180,67 +182,84 @@ void Process::WorkerThreadProc(int taskFd)
#endif /* _MSC_VER */ #endif /* _MSC_VER */
#ifndef _MSC_VER #ifndef _MSC_VER
if (FD_ISSET(taskFd, &readfds)) { for (int i = 0; i < idx; i++) {
if ((pfds[i].revents & POLLIN) == 0)
continue;
if (pfds[i].fd == taskFd) {
#endif /* _MSC_VER */ #endif /* _MSC_VER */
while (tasks.size() < MaxTasksPerThread) { while (tasks.size() < MaxTasksPerThread) {
Process::Ptr task; Process::Ptr task;
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
/* Read one byte for every task we take from the pending tasks list. */ #ifndef _MSC_VER
char buffer; /* Read one byte for every task we take from the pending tasks list. */
int rc = read(taskFd, &buffer, sizeof(buffer)); char buffer;
int rc = read(taskFd, &buffer, sizeof(buffer));
if (rc < 0) { if (rc < 0) {
if (errno == EAGAIN) if (errno == EAGAIN)
break; /* Someone else was faster and took our task. */ break; /* Someone else was faster and took our task. */
BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
}
assert(!m_Tasks.empty());
#else /* _MSC_VER */
if (m_Tasks.empty())
break;
#endif /* _MSC_VER */
task = m_Tasks.front();
m_Tasks.pop_front();
} }
assert(!m_Tasks.empty()); try {
task->InitTask();
task = m_Tasks.front(); int fd = fileno(task->m_FP);
m_Tasks.pop_front(); if (fd >= 0)
tasks[fd] = task;
} catch (...) {
Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
}
} }
try {
task->InitTask();
int fd = fileno(task->m_FP);
if (fd >= 0)
tasks[fd] = task;
} catch (...) {
Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
}
}
#ifndef _MSC_VER #ifndef _MSC_VER
}
#endif /* _MSC_VER */
for (it = tasks.begin(); it != tasks.end(); ) {
int fd = it->first;
Process::Ptr task = it->second;
#ifndef _MSC_VER
if (!FD_ISSET(fd, &readfds)) {
it++;
continue; continue;
} }
it = tasks.find(pfds[i].fd);
if (it == tasks.end())
continue;
#else /* _MSC_VER */
for (it = tasks.begin(); it != tasks.end(); ) {
int fd = it->first;
#endif /* _MSC_VER */ #endif /* _MSC_VER */
Process::Ptr task = it->second;
if (!task->RunTask()) { if (!task->RunTask()) {
prev = it; prev = it;
it++; #ifdef _MSC_VER
tasks.erase(prev); it++;
#endif /* _MSC_VER */
tasks.erase(prev);
Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
} else { #ifdef _MSC_VER
it++; } else {
it++;
#endif /* _MSC_VER */
}
#ifdef _MSC_VER
} }
#else /* _MSC_VER */
} }
#endif /* _MSC_VER */
} }
} }

View File

@ -35,6 +35,7 @@
#include <syslog.h> #include <syslog.h>
#include <sys/file.h> #include <sys/file.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <poll.h>
#include <glob.h> #include <glob.h>
#include <ltdl.h> #include <ltdl.h>