Fix: Make sure we're continuously reading from child processes' pipes

fixes #9867
This commit is contained in:
Gunnar Beutner 2015-08-10 13:33:32 +02:00
parent f2b32ac549
commit b64c13cf3c
2 changed files with 87 additions and 17 deletions

View File

@ -61,7 +61,21 @@ INITIALIZE_ONCE(&Process::StaticInitialize);
Process::Process(const Process::Arguments& arguments, const Dictionary::Ptr& extraEnvironment)
: m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment), m_Timeout(600)
{ }
#ifdef _WIN32
, m_ReadPending(false), m_ReadFailed(false), m_Overlapped()
#endif /* _WIN32 */
{
#ifdef _WIN32
m_Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
#endif /* _WIN32 */
}
Process::~Process(void)
{
#ifdef _WIN32
CloseHandle(m_Overlapped.hEvent);
#endif /* _WIN32 */
}
void Process::StaticInitialize(void)
{
@ -152,6 +166,7 @@ void Process::IOThreadProc(int tid)
{
#ifdef _WIN32
HANDLE *handles = NULL;
HANDLE *fhandles = NULL;
#else /* _WIN32 */
pollfd *pfds = NULL;
#endif /* _WIN32 */
@ -171,8 +186,9 @@ void Process::IOThreadProc(int tid)
count = 1 + l_Processes[tid].size();
#ifdef _WIN32
handles = reinterpret_cast<HANDLE *>(realloc(handles, sizeof(HANDLE) * count));
fhandles = reinterpret_cast<HANDLE *>(realloc(fhandles, sizeof(HANDLE) * count));
handles[0] = l_Events[tid];
fhandles[0] = l_Events[tid];
#else /* _WIN32 */
pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
@ -185,16 +201,29 @@ void Process::IOThreadProc(int tid)
int i = 1;
std::pair<ProcessHandle, Process::Ptr> kv;
BOOST_FOREACH(kv, l_Processes[tid]) {
const Process::Ptr& process = kv.second;
#ifdef _WIN32
handles[i] = kv.first;
if (!process->m_ReadPending) {
process->m_ReadPending = true;
BOOL res = ReadFile(process->m_FD, process->m_ReadBuffer, sizeof(process->m_ReadBuffer), 0, &process->m_Overlapped);
if (res || GetLastError() != ERROR_IO_PENDING) {
process->m_ReadFailed = !res;
SetEvent(process->m_Overlapped.hEvent);
}
}
fhandles[i] = process->m_Overlapped.hEvent;
#else /* _WIN32 */
pfds[i].fd = kv.second->m_FD;
pfds[i].fd = process->m_FD;
pfds[i].events = POLLIN;
pfds[i].revents = 0;
#endif /* _WIN32 */
if (kv.second->m_Timeout != 0) {
double delta = kv.second->m_Timeout - (now - kv.second->m_Result.ExecutionStart);
if (process->m_Timeout != 0) {
double delta = process->m_Timeout - (now - process->m_Result.ExecutionStart);
if (timeout == -1 || delta < timeout)
timeout = delta;
@ -210,7 +239,7 @@ void Process::IOThreadProc(int tid)
timeout *= 1000;
#ifdef _WIN32
DWORD rc = WaitForMultipleObjects(count, handles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
DWORD rc = WaitForMultipleObjects(count, fhandles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
#else /* _WIN32 */
int rc = poll(pfds, count, timeout);
@ -290,6 +319,39 @@ String Process::PrettyPrintArguments(const Process::Arguments& arguments)
#endif /* _WIN32 */
}
#ifdef _WIN32
static BOOL CreatePipeOverlapped(HANDLE *outReadPipe, HANDLE *outWritePipe,
SECURITY_ATTRIBUTES *securityAttributes, DWORD size, DWORD readMode, DWORD writeMode)
{
static int pipeIndex = 0;
if (size == 0)
size = 8192;
pipeIndex++;
char pipeName[128];
sprintf(pipeName, "\\\\.\\Pipe\\OverlappedPipe.%d.%d", (int)GetCurrentProcessId(), pipeIndex);
*outReadPipe = CreateNamedPipe(pipeName, PIPE_ACCESS_INBOUND | readMode,
PIPE_TYPE_BYTE | PIPE_WAIT, 1, size, size, 60 * 1000, securityAttributes);
if (!*outReadPipe)
return FALSE;
*outWritePipe = CreateFile(pipeName, GENERIC_WRITE, 0, securityAttributes, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | writeMode, NULL);
if (*outWritePipe == INVALID_HANDLE_VALUE) {
DWORD error = GetLastError();
CloseHandle(*outReadPipe);
SetLastError(error);
return FALSE;
}
return TRUE;
}
#endif /* _WIN32 */
void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
{
boost::call_once(l_OnceFlag, &Process::ThreadInitialize);
@ -302,7 +364,7 @@ void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
sa.bInheritHandle = TRUE;
HANDLE outReadPipe, outWritePipe;
if (!CreatePipe(&outReadPipe, &outWritePipe, &sa, 0))
if (!CreatePipeOverlapped(&outReadPipe, &outWritePipe, &sa, 0, FILE_FLAG_OVERLAPPED, 0))
BOOST_THROW_EXCEPTION(win32_error()
<< boost::errinfo_api_function("CreatePipe")
<< errinfo_win32_error(GetLastError()));
@ -631,30 +693,30 @@ bool Process::DoEvents(void)
}
if (!is_timeout) {
#ifdef _WIN32
m_ReadPending = false;
DWORD rc;
if (!m_ReadFailed && GetOverlappedResult(m_FD, &m_Overlapped, &rc, TRUE) && rc > 0) {
m_OutputStream.write(m_ReadBuffer, rc);
return true;
}
#else /* _WIN32 */
char buffer[512];
for (;;) {
#ifdef _WIN32
DWORD rc;
if (!ReadFile(m_FD, buffer, sizeof(buffer), &rc, NULL) || rc == 0)
break;
#else /* _WIN32 */
int rc = read(m_FD, buffer, sizeof(buffer));
if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
return true;
if (rc > 0) {
#endif /* _WIN32 */
m_OutputStream.write(buffer, rc);
#ifdef _WIN32
return true;
#else /* _WIN32 */
continue;
}
#endif /* _WIN32 */
break;
}
#endif /* _WIN32 */
}
String output = m_OutputStream.str();

View File

@ -68,6 +68,7 @@ public:
static const std::deque<Process::Ptr>::size_type MaxTasksPerThread = 512;
Process(const Arguments& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr());
~Process(void);
void SetTimeout(double timeout);
double GetTimeout(void) const;
@ -93,6 +94,13 @@ private:
pid_t m_PID;
ConsoleHandle m_FD;
#ifdef _WIN32
bool m_ReadPending;
bool m_ReadFailed;
OVERLAPPED m_Overlapped;
char m_ReadBuffer[1024];
#endif /* _WIN32 */
std::ostringstream m_OutputStream;
boost::function<void (const ProcessResult&)> m_Callback;
ProcessResult m_Result;