diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 2ee52b542..d57ae7b73 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -24,7 +24,7 @@ using namespace icinga; bool Process::m_ThreadCreated = false; boost::mutex Process::m_Mutex; deque Process::m_Tasks; -condition_variable Process::m_TasksCV; +int Process::m_TaskFd; Process::Process(const String& command, const Dictionary::Ptr& environment) : AsyncTask(), m_Command(command), @@ -33,7 +33,14 @@ Process::Process(const String& command, const Dictionary::Ptr& environment) assert(Application::IsMainThread()); if (!m_ThreadCreated) { - thread t(&Process::WorkerThreadProc); + int fds[2]; + + if (pipe(fds) < 0) + BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno)); + + m_TaskFd = fds[1]; + + thread t(&Process::WorkerThreadProc, fds[0]); t.detach(); m_ThreadCreated = true; @@ -42,87 +49,103 @@ Process::Process(const String& command, const Dictionary::Ptr& environment) void Process::Run(void) { - boost::mutex::scoped_lock lock(m_Mutex); - m_Tasks.push_back(GetSelf()); - m_TasksCV.notify_all(); + { + boost::mutex::scoped_lock lock(m_Mutex); + m_Tasks.push_back(GetSelf()); + } + + /** + * This little gem which is commonly known as the "self-pipe trick" + * takes care of waking up the select() call in the worker thread. + */ + if (write(m_TaskFd, "T", 1) < 0) + BOOST_THROW_EXCEPTION(PosixException("write() failed.", errno)); } -void Process::WorkerThreadProc(void) +void Process::WorkerThreadProc(int taskFd) { - boost::mutex::scoped_lock lock(m_Mutex); - map tasks; for (;;) { - while (m_Tasks.empty() || tasks.size() >= MaxTasksPerThread) { - lock.unlock(); - - map::iterator it, prev; + map::iterator it, prev; #ifndef _MSC_VER - fd_set readfds; - int nfds = 0; + fd_set readfds; + int nfds = 0; - FD_ZERO(&readfds); + FD_ZERO(&readfds); + FD_SET(taskFd, &readfds); - int fd; - BOOST_FOREACH(tie(fd, tuples::ignore), tasks) { - if (fd > nfds) - nfds = fd; + if (taskFd > nfds) + nfds = taskFd; - FD_SET(fd, &readfds); - } + int fd; + BOOST_FOREACH(tie(fd, tuples::ignore), tasks) { + if (fd > nfds) + nfds = fd; - timeval tv; - tv.tv_sec = 1; - tv.tv_usec = 0; - select(nfds + 1, &readfds, NULL, NULL, &tv); -#else /* _MSC_VER */ - Utility::Sleep(1); -#endif /* _MSC_VER */ - - for (it = tasks.begin(); it != tasks.end(); ) { - int fd = it->first; - Process::Ptr task = it->second; - -#ifndef _MSC_VER - if (!FD_ISSET(fd, &readfds)) { - it++; - continue; - } -#endif /* _MSC_VER */ - - if (!task->RunTask()) { - prev = it; - it++; - tasks.erase(prev); - - Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); - } else { - it++; - } - } - - lock.lock(); + FD_SET(fd, &readfds); } - while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) { - Process::Ptr task = m_Tasks.front(); - m_Tasks.pop_front(); + timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + select(nfds + 1, &readfds, NULL, NULL, &tv); +#else /* _MSC_VER */ + Utility::Sleep(1); +#endif /* _MSC_VER */ - lock.unlock(); + if (FD_ISSET(taskFd, &readfds)) { + /* clear pipe */ + char buffer[512]; + int rc = read(taskFd, buffer, sizeof(buffer)); + assert(rc >= 1); - try { - task->InitTask(); + while (tasks.size() < MaxTasksPerThread) { + Process::Ptr task; - int fd = task->GetFD(); - if (fd >= 0) - tasks[fd] = task; - } catch (...) { - Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception())); + { + boost::mutex::scoped_lock lock(m_Mutex); + + if (m_Tasks.empty()) + break; + + task = m_Tasks.front(); + m_Tasks.pop_front(); + } + + try { + task->InitTask(); + + int fd = task->GetFD(); + if (fd >= 0) + tasks[fd] = task; + } catch (...) { + Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception())); + } } + } - lock.lock(); + for (it = tasks.begin(); it != tasks.end(); ) { + int fd = it->first; + Process::Ptr task = it->second; + +#ifndef _MSC_VER + if (!FD_ISSET(fd, &readfds)) { + it++; + continue; + } +#endif /* _MSC_VER */ + + if (!task->RunTask()) { + prev = it; + it++; + tasks.erase(prev); + + Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); + } else { + it++; + } } } } diff --git a/lib/base/process.h b/lib/base/process.h index 1c0c2f569..60443458b 100644 --- a/lib/base/process.h +++ b/lib/base/process.h @@ -71,9 +71,9 @@ private: static boost::mutex m_Mutex; static deque m_Tasks; - static condition_variable m_TasksCV; + static int m_TaskFd; - static void WorkerThreadProc(void); + static void WorkerThreadProc(int taskFd); void InitTask(void); bool RunTask(void);