From 3556f9afded3463b719740223e8a3a2402824022 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Fri, 14 Mar 2014 13:21:11 +0100 Subject: [PATCH] Use more threads for Process I/O. Refs #5748 --- lib/base/process-unix.cpp | 81 +++++++++++++++++++++++---------------- lib/base/process.h | 3 +- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/lib/base/process-unix.cpp b/lib/base/process-unix.cpp index 479ff69b2..13052229f 100644 --- a/lib/base/process-unix.cpp +++ b/lib/base/process-unix.cpp @@ -44,66 +44,74 @@ extern char **environ; #define environ (*_NSGetEnviron()) #endif /* __APPLE__ */ -static boost::mutex l_ProcessMutex; -static std::map l_Processes; -static int l_EventFDs[2]; +#define IOTHREADS 8 + +static boost::mutex l_ProcessMutex[IOTHREADS]; +static std::map l_Processes[IOTHREADS]; +static int l_EventFDs[IOTHREADS][2]; static boost::once_flag l_OnceFlag = BOOST_ONCE_INIT; INITIALIZE_ONCE(&Process::StaticInitialize); void Process::StaticInitialize(void) { -#ifdef HAVE_PIPE2 - if (pipe2(l_EventFDs, O_CLOEXEC) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("pipe2") - << boost::errinfo_errno(errno)); - } + for (int tid = 0; tid < IOTHREADS; tid++) { + #ifdef HAVE_PIPE2 + if (pipe2(l_EventFDs[tid], O_CLOEXEC) < 0) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("pipe2") + << boost::errinfo_errno(errno)); + } #else /* HAVE_PIPE2 */ - if (pipe(l_EventFDs) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("pipe") - << boost::errinfo_errno(errno)); - } + if (pipe(l_EventFDs[tid]) < 0) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("pipe") + << boost::errinfo_errno(errno)); + } - Utility::SetCloExec(l_EventFDs[0]); - Utility::SetCloExec(l_EventFDs[1]); + Utility::SetCloExec(l_EventFDs[tid][0]); + Utility::SetCloExec(l_EventFDs[tid][1]); #endif /* HAVE_PIPE2 */ - Utility::SetNonBlocking(l_EventFDs[0]); - Utility::SetNonBlocking(l_EventFDs[1]); + Utility::SetNonBlocking(l_EventFDs[tid][0]); + Utility::SetNonBlocking(l_EventFDs[tid][1]); + } } void Process::ThreadInitialize(void) { /* Note to self: Make sure this runs _after_ we've daemonized. */ - boost::thread t(&Process::IOThreadProc); - t.detach(); + for (int tid = 0; tid < IOTHREADS; tid++) { + boost::thread t(boost::bind(&Process::IOThreadProc, tid)); + t.detach(); + } } -void Process::IOThreadProc(void) +void Process::IOThreadProc(int tid) { pollfd *pfds = NULL; int count = 0; + Utility::SetThreadName("ProcessIO"); + for (;;) { double now, timeout = -1; now = Utility::GetTime(); { - boost::mutex::scoped_lock lock(l_ProcessMutex); + boost::mutex::scoped_lock lock(l_ProcessMutex[tid]); - count = 1 + l_Processes.size(); + count = 1 + l_Processes[tid].size(); pfds = reinterpret_cast(realloc(pfds, sizeof(pollfd) * count)); - pfds[0].fd = l_EventFDs[0]; + pfds[0].fd = l_EventFDs[tid][0]; pfds[0].events = POLLIN; pfds[0].revents = 0; int i = 1; std::pair kv; - BOOST_FOREACH(kv, l_Processes) { + BOOST_FOREACH(kv, l_Processes[tid]) { pfds[i].fd = kv.second->m_FD; pfds[i].events = POLLIN; pfds[i].revents = 0; @@ -125,24 +133,24 @@ void Process::IOThreadProc(void) continue; { - boost::mutex::scoped_lock lock(l_ProcessMutex); + boost::mutex::scoped_lock lock(l_ProcessMutex[tid]); if (pfds[0].revents & (POLLIN|POLLHUP|POLLERR)) { char buffer[512]; - (void) read(l_EventFDs[0], buffer, sizeof(buffer)); + (void) read(l_EventFDs[tid][0], buffer, sizeof(buffer)); } for (int i = 1; i < count; i++) { if (pfds[i].revents & (POLLIN|POLLHUP|POLLERR)) { std::map::iterator it; - it = l_Processes.find(pfds[i].fd); + it = l_Processes[tid].find(pfds[i].fd); - if (it == l_Processes.end()) + if (it == l_Processes[tid].end()) continue; /* This should never happen. */ if (!it->second->DoEvents()) { (void) close(it->first); - l_Processes.erase(it); + l_Processes[tid].erase(it); } } } @@ -273,12 +281,14 @@ void Process::Run(const boost::function& callback) m_FD = fds[0]; m_Callback = callback; + int tid = GetTID(); + { - boost::mutex::scoped_lock lock(l_ProcessMutex); - l_Processes[m_FD] = GetSelf(); + boost::mutex::scoped_lock lock(l_ProcessMutex[tid]); + l_Processes[tid][m_FD] = GetSelf(); } - (void) write(l_EventFDs[1], "T", 1); + (void) write(l_EventFDs[tid][1], "T", 1); } bool Process::DoEvents(void) @@ -338,4 +348,9 @@ bool Process::DoEvents(void) return false; } +int Process::GetTID(void) const +{ + return (reinterpret_cast(this) / sizeof(void *)) % IOTHREADS; +} + #endif /* _WIN32 */ diff --git a/lib/base/process.h b/lib/base/process.h index c8e18f0c5..a7cae9fed 100644 --- a/lib/base/process.h +++ b/lib/base/process.h @@ -85,8 +85,9 @@ private: boost::function m_Callback; ProcessResult m_Result; - static void IOThreadProc(void); + static void IOThreadProc(int tid); bool DoEvents(void); + int GetTID(void) const; #endif /* _WIN32 */ };