Use more threads for Process I/O.

Refs #5748
This commit is contained in:
Gunnar Beutner 2014-03-14 13:21:11 +01:00
parent 5252041c6b
commit 3556f9afde
2 changed files with 50 additions and 34 deletions

View File

@ -44,66 +44,74 @@ extern char **environ;
#define environ (*_NSGetEnviron())
#endif /* __APPLE__ */
static boost::mutex l_ProcessMutex;
static std::map<int, Process::Ptr> l_Processes;
static int l_EventFDs[2];
#define IOTHREADS 8
static boost::mutex l_ProcessMutex[IOTHREADS];
static std::map<int, Process::Ptr> l_Processes[IOTHREADS];
static int l_EventFDs[IOTHREADS][2];
static boost::once_flag l_OnceFlag = BOOST_ONCE_INIT;
INITIALIZE_ONCE(&Process::StaticInitialize);
void Process::StaticInitialize(void)
{
#ifdef HAVE_PIPE2
if (pipe2(l_EventFDs, O_CLOEXEC) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("pipe2")
<< boost::errinfo_errno(errno));
}
for (int tid = 0; tid < IOTHREADS; tid++) {
#ifdef HAVE_PIPE2
if (pipe2(l_EventFDs[tid], O_CLOEXEC) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("pipe2")
<< boost::errinfo_errno(errno));
}
#else /* HAVE_PIPE2 */
if (pipe(l_EventFDs) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("pipe")
<< boost::errinfo_errno(errno));
}
if (pipe(l_EventFDs[tid]) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("pipe")
<< boost::errinfo_errno(errno));
}
Utility::SetCloExec(l_EventFDs[0]);
Utility::SetCloExec(l_EventFDs[1]);
Utility::SetCloExec(l_EventFDs[tid][0]);
Utility::SetCloExec(l_EventFDs[tid][1]);
#endif /* HAVE_PIPE2 */
Utility::SetNonBlocking(l_EventFDs[0]);
Utility::SetNonBlocking(l_EventFDs[1]);
Utility::SetNonBlocking(l_EventFDs[tid][0]);
Utility::SetNonBlocking(l_EventFDs[tid][1]);
}
}
void Process::ThreadInitialize(void)
{
/* Note to self: Make sure this runs _after_ we've daemonized. */
boost::thread t(&Process::IOThreadProc);
t.detach();
for (int tid = 0; tid < IOTHREADS; tid++) {
boost::thread t(boost::bind(&Process::IOThreadProc, tid));
t.detach();
}
}
void Process::IOThreadProc(void)
void Process::IOThreadProc(int tid)
{
pollfd *pfds = NULL;
int count = 0;
Utility::SetThreadName("ProcessIO");
for (;;) {
double now, timeout = -1;
now = Utility::GetTime();
{
boost::mutex::scoped_lock lock(l_ProcessMutex);
boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
count = 1 + l_Processes.size();
count = 1 + l_Processes[tid].size();
pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
pfds[0].fd = l_EventFDs[0];
pfds[0].fd = l_EventFDs[tid][0];
pfds[0].events = POLLIN;
pfds[0].revents = 0;
int i = 1;
std::pair<int, Process::Ptr> kv;
BOOST_FOREACH(kv, l_Processes) {
BOOST_FOREACH(kv, l_Processes[tid]) {
pfds[i].fd = kv.second->m_FD;
pfds[i].events = POLLIN;
pfds[i].revents = 0;
@ -125,24 +133,24 @@ void Process::IOThreadProc(void)
continue;
{
boost::mutex::scoped_lock lock(l_ProcessMutex);
boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
if (pfds[0].revents & (POLLIN|POLLHUP|POLLERR)) {
char buffer[512];
(void) read(l_EventFDs[0], buffer, sizeof(buffer));
(void) read(l_EventFDs[tid][0], buffer, sizeof(buffer));
}
for (int i = 1; i < count; i++) {
if (pfds[i].revents & (POLLIN|POLLHUP|POLLERR)) {
std::map<int, Process::Ptr>::iterator it;
it = l_Processes.find(pfds[i].fd);
it = l_Processes[tid].find(pfds[i].fd);
if (it == l_Processes.end())
if (it == l_Processes[tid].end())
continue; /* This should never happen. */
if (!it->second->DoEvents()) {
(void) close(it->first);
l_Processes.erase(it);
l_Processes[tid].erase(it);
}
}
}
@ -273,12 +281,14 @@ void Process::Run(const boost::function<void (const ProcessResult&)>& callback)
m_FD = fds[0];
m_Callback = callback;
int tid = GetTID();
{
boost::mutex::scoped_lock lock(l_ProcessMutex);
l_Processes[m_FD] = GetSelf();
boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
l_Processes[tid][m_FD] = GetSelf();
}
(void) write(l_EventFDs[1], "T", 1);
(void) write(l_EventFDs[tid][1], "T", 1);
}
bool Process::DoEvents(void)
@ -338,4 +348,9 @@ bool Process::DoEvents(void)
return false;
}
int Process::GetTID(void) const
{
return (reinterpret_cast<uintptr_t>(this) / sizeof(void *)) % IOTHREADS;
}
#endif /* _WIN32 */

View File

@ -85,8 +85,9 @@ private:
boost::function<void (const ProcessResult&)> m_Callback;
ProcessResult m_Result;
static void IOThreadProc(void);
static void IOThreadProc(int tid);
bool DoEvents(void);
int GetTID(void) const;
#endif /* _WIN32 */
};