From bddd9ebf0b153ebfec1134a551a93c8e91576dd7 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Sun, 10 Feb 2013 23:31:11 +0100 Subject: [PATCH] Implemented poll() support for Process tasks. Fixes #3035 --- lib/base/process.cpp | 129 +++++++++++++++++++++++++------------------ lib/base/unix.h | 1 + 2 files changed, 75 insertions(+), 55 deletions(-) diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 882fb77bd..0efc5856c 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -143,34 +143,36 @@ void Process::Run(void) void Process::WorkerThreadProc(int taskFd) { map tasks; + pollfd *pfds; for (;;) { map::iterator it, prev; #ifndef _MSC_VER - fd_set readfds; - int nfds = 0; + pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd)); - FD_ZERO(&readfds); + if (pfds == NULL) + BOOST_THROW_EXCEPTION(PosixException("realloc() failed.", errno)); - if (tasks.size() < MaxTasksPerThread) - FD_SET(taskFd, &readfds); + int idx = 0; - if (taskFd > nfds) - nfds = taskFd; + if (tasks.size() < MaxTasksPerThread) { + pfds[idx].fd = taskFd; + pfds[idx].events = POLLIN; + idx++; + } int fd; BOOST_FOREACH(tie(fd, tuples::ignore), tasks) { - if (fd > nfds) - nfds = fd; - - FD_SET(fd, &readfds); + pfds[idx].fd = fd; + pfds[idx].events = POLLIN; + idx++; } - int rc = select(nfds + 1, &readfds, NULL, NULL, NULL); + int rc = poll(pfds, idx, -1); if (rc < 0 && errno != EINTR) - BOOST_THROW_EXCEPTION(PosixException("select() failed.", errno)); + BOOST_THROW_EXCEPTION(PosixException("poll() failed.", errno)); if (rc == 0) continue; @@ -180,67 +182,84 @@ void Process::WorkerThreadProc(int taskFd) #endif /* _MSC_VER */ #ifndef _MSC_VER - if (FD_ISSET(taskFd, &readfds)) { + for (int i = 0; i < idx; i++) { + if ((pfds[i].revents & POLLIN) == 0) + continue; + + if (pfds[i].fd == taskFd) { #endif /* _MSC_VER */ - while (tasks.size() < MaxTasksPerThread) { - Process::Ptr task; + while (tasks.size() < MaxTasksPerThread) { + Process::Ptr task; - { - boost::mutex::scoped_lock lock(m_Mutex); + { + boost::mutex::scoped_lock lock(m_Mutex); - /* Read one byte for every task we take from the pending tasks list. */ - char buffer; - int rc = read(taskFd, &buffer, sizeof(buffer)); +#ifndef _MSC_VER + /* Read one byte for every task we take from the pending tasks list. */ + char buffer; + int rc = read(taskFd, &buffer, sizeof(buffer)); - if (rc < 0) { - if (errno == EAGAIN) - break; /* Someone else was faster and took our task. */ + if (rc < 0) { + if (errno == EAGAIN) + break; /* Someone else was faster and took our task. */ - BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); + BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno)); + } + + assert(!m_Tasks.empty()); +#else /* _MSC_VER */ + if (m_Tasks.empty()) + break; +#endif /* _MSC_VER */ + + task = m_Tasks.front(); + m_Tasks.pop_front(); } - assert(!m_Tasks.empty()); + try { + task->InitTask(); - task = m_Tasks.front(); - m_Tasks.pop_front(); + int fd = fileno(task->m_FP); + if (fd >= 0) + tasks[fd] = task; + } catch (...) { + Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception())); + } } - - try { - task->InitTask(); - - int fd = fileno(task->m_FP); - if (fd >= 0) - tasks[fd] = task; - } catch (...) { - Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception())); - } - } #ifndef _MSC_VER - } -#endif /* _MSC_VER */ - for (it = tasks.begin(); it != tasks.end(); ) { - int fd = it->first; - Process::Ptr task = it->second; - -#ifndef _MSC_VER - if (!FD_ISSET(fd, &readfds)) { - it++; continue; } + + it = tasks.find(pfds[i].fd); + + if (it == tasks.end()) + continue; +#else /* _MSC_VER */ + for (it = tasks.begin(); it != tasks.end(); ) { + int fd = it->first; #endif /* _MSC_VER */ + Process::Ptr task = it->second; - if (!task->RunTask()) { - prev = it; - it++; - tasks.erase(prev); + if (!task->RunTask()) { + prev = it; +#ifdef _MSC_VER + it++; +#endif /* _MSC_VER */ + tasks.erase(prev); - Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); - } else { - it++; + Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); +#ifdef _MSC_VER + } else { + it++; +#endif /* _MSC_VER */ + } +#ifdef _MSC_VER } +#else /* _MSC_VER */ } +#endif /* _MSC_VER */ } } diff --git a/lib/base/unix.h b/lib/base/unix.h index e39af513d..af58c92fe 100644 --- a/lib/base/unix.h +++ b/lib/base/unix.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include