mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-23 21:55:03 +02:00
Fix: Make sure we're continuously reading from child processes' pipes
fixes #9867
This commit is contained in:
parent
8f3396ae0d
commit
f541a62a34
@ -61,7 +61,21 @@ INITIALIZE_ONCE(&Process::StaticInitialize);
|
|||||||
|
|
||||||
Process::Process(const Process::Arguments& arguments, const Dictionary::Ptr& extraEnvironment)
|
Process::Process(const Process::Arguments& arguments, const Dictionary::Ptr& extraEnvironment)
|
||||||
: m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment), m_Timeout(600)
|
: m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment), m_Timeout(600)
|
||||||
{ }
|
#ifdef _WIN32
|
||||||
|
, m_ReadPending(false), m_ReadFailed(false), m_Overlapped()
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
{
|
||||||
|
#ifdef _WIN32
|
||||||
|
m_Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
}
|
||||||
|
|
||||||
|
Process::~Process(void)
|
||||||
|
{
|
||||||
|
#ifdef _WIN32
|
||||||
|
CloseHandle(m_Overlapped.hEvent);
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
}
|
||||||
|
|
||||||
void Process::StaticInitialize(void)
|
void Process::StaticInitialize(void)
|
||||||
{
|
{
|
||||||
@ -152,6 +166,7 @@ void Process::IOThreadProc(int tid)
|
|||||||
{
|
{
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
HANDLE *handles = NULL;
|
HANDLE *handles = NULL;
|
||||||
|
HANDLE *fhandles = NULL;
|
||||||
#else /* _WIN32 */
|
#else /* _WIN32 */
|
||||||
pollfd *pfds = NULL;
|
pollfd *pfds = NULL;
|
||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
@ -171,8 +186,9 @@ void Process::IOThreadProc(int tid)
|
|||||||
count = 1 + l_Processes[tid].size();
|
count = 1 + l_Processes[tid].size();
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
handles = reinterpret_cast<HANDLE *>(realloc(handles, sizeof(HANDLE) * count));
|
handles = reinterpret_cast<HANDLE *>(realloc(handles, sizeof(HANDLE) * count));
|
||||||
|
fhandles = reinterpret_cast<HANDLE *>(realloc(fhandles, sizeof(HANDLE) * count));
|
||||||
|
|
||||||
handles[0] = l_Events[tid];
|
fhandles[0] = l_Events[tid];
|
||||||
|
|
||||||
#else /* _WIN32 */
|
#else /* _WIN32 */
|
||||||
pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
|
pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
|
||||||
@ -185,16 +201,29 @@ void Process::IOThreadProc(int tid)
|
|||||||
int i = 1;
|
int i = 1;
|
||||||
std::pair<ProcessHandle, Process::Ptr> kv;
|
std::pair<ProcessHandle, Process::Ptr> kv;
|
||||||
BOOST_FOREACH(kv, l_Processes[tid]) {
|
BOOST_FOREACH(kv, l_Processes[tid]) {
|
||||||
|
const Process::Ptr& process = kv.second;
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
handles[i] = kv.first;
|
handles[i] = kv.first;
|
||||||
|
|
||||||
|
if (!process->m_ReadPending) {
|
||||||
|
process->m_ReadPending = true;
|
||||||
|
|
||||||
|
BOOL res = ReadFile(process->m_FD, process->m_ReadBuffer, sizeof(process->m_ReadBuffer), 0, &process->m_Overlapped);
|
||||||
|
if (res || GetLastError() != ERROR_IO_PENDING) {
|
||||||
|
process->m_ReadFailed = !res;
|
||||||
|
SetEvent(process->m_Overlapped.hEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fhandles[i] = process->m_Overlapped.hEvent;
|
||||||
#else /* _WIN32 */
|
#else /* _WIN32 */
|
||||||
pfds[i].fd = kv.second->m_FD;
|
pfds[i].fd = process->m_FD;
|
||||||
pfds[i].events = POLLIN;
|
pfds[i].events = POLLIN;
|
||||||
pfds[i].revents = 0;
|
pfds[i].revents = 0;
|
||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
|
|
||||||
if (kv.second->m_Timeout != 0) {
|
if (process->m_Timeout != 0) {
|
||||||
double delta = kv.second->m_Timeout - (now - kv.second->m_Result.ExecutionStart);
|
double delta = process->m_Timeout - (now - process->m_Result.ExecutionStart);
|
||||||
|
|
||||||
if (timeout == -1 || delta < timeout)
|
if (timeout == -1 || delta < timeout)
|
||||||
timeout = delta;
|
timeout = delta;
|
||||||
@ -210,7 +239,7 @@ void Process::IOThreadProc(int tid)
|
|||||||
timeout *= 1000;
|
timeout *= 1000;
|
||||||
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
DWORD rc = WaitForMultipleObjects(count, handles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
|
DWORD rc = WaitForMultipleObjects(count, fhandles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
|
||||||
#else /* _WIN32 */
|
#else /* _WIN32 */
|
||||||
int rc = poll(pfds, count, timeout);
|
int rc = poll(pfds, count, timeout);
|
||||||
|
|
||||||
@ -290,6 +319,39 @@ String Process::PrettyPrintArguments(const Process::Arguments& arguments)
|
|||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef _WIN32
|
||||||
|
static BOOL CreatePipeOverlapped(HANDLE *outReadPipe, HANDLE *outWritePipe,
|
||||||
|
SECURITY_ATTRIBUTES *securityAttributes, DWORD size, DWORD readMode, DWORD writeMode)
|
||||||
|
{
|
||||||
|
static int pipeIndex = 0;
|
||||||
|
|
||||||
|
if (size == 0)
|
||||||
|
size = 8192;
|
||||||
|
|
||||||
|
pipeIndex++;
|
||||||
|
|
||||||
|
char pipeName[128];
|
||||||
|
sprintf(pipeName, "\\\\.\\Pipe\\OverlappedPipe.%d.%d", (int)GetCurrentProcessId(), pipeIndex);
|
||||||
|
|
||||||
|
*outReadPipe = CreateNamedPipe(pipeName, PIPE_ACCESS_INBOUND | readMode,
|
||||||
|
PIPE_TYPE_BYTE | PIPE_WAIT, 1, size, size, 60 * 1000, securityAttributes);
|
||||||
|
|
||||||
|
if (!*outReadPipe)
|
||||||
|
return FALSE;
|
||||||
|
|
||||||
|
*outWritePipe = CreateFile(pipeName, GENERIC_WRITE, 0, securityAttributes, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | writeMode, NULL);
|
||||||
|
|
||||||
|
if (*outWritePipe == INVALID_HANDLE_VALUE) {
|
||||||
|
DWORD error = GetLastError();
|
||||||
|
CloseHandle(*outReadPipe);
|
||||||
|
SetLastError(error);
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
|
||||||
void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
|
void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
|
||||||
{
|
{
|
||||||
boost::call_once(l_OnceFlag, &Process::ThreadInitialize);
|
boost::call_once(l_OnceFlag, &Process::ThreadInitialize);
|
||||||
@ -302,7 +364,7 @@ void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
|
|||||||
sa.bInheritHandle = TRUE;
|
sa.bInheritHandle = TRUE;
|
||||||
|
|
||||||
HANDLE outReadPipe, outWritePipe;
|
HANDLE outReadPipe, outWritePipe;
|
||||||
if (!CreatePipe(&outReadPipe, &outWritePipe, &sa, 0))
|
if (!CreatePipeOverlapped(&outReadPipe, &outWritePipe, &sa, 0, FILE_FLAG_OVERLAPPED, 0))
|
||||||
BOOST_THROW_EXCEPTION(win32_error()
|
BOOST_THROW_EXCEPTION(win32_error()
|
||||||
<< boost::errinfo_api_function("CreatePipe")
|
<< boost::errinfo_api_function("CreatePipe")
|
||||||
<< errinfo_win32_error(GetLastError()));
|
<< errinfo_win32_error(GetLastError()));
|
||||||
@ -631,30 +693,30 @@ bool Process::DoEvents(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!is_timeout) {
|
if (!is_timeout) {
|
||||||
|
#ifdef _WIN32
|
||||||
|
m_ReadPending = false;
|
||||||
|
|
||||||
|
DWORD rc;
|
||||||
|
if (!m_ReadFailed && GetOverlappedResult(m_FD, &m_Overlapped, &rc, TRUE) && rc > 0) {
|
||||||
|
m_OutputStream.write(m_ReadBuffer, rc);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
#else /* _WIN32 */
|
||||||
char buffer[512];
|
char buffer[512];
|
||||||
for (;;) {
|
for (;;) {
|
||||||
#ifdef _WIN32
|
|
||||||
DWORD rc;
|
|
||||||
if (!ReadFile(m_FD, buffer, sizeof(buffer), &rc, NULL) || rc == 0)
|
|
||||||
break;
|
|
||||||
#else /* _WIN32 */
|
|
||||||
int rc = read(m_FD, buffer, sizeof(buffer));
|
int rc = read(m_FD, buffer, sizeof(buffer));
|
||||||
|
|
||||||
if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
if (rc > 0) {
|
if (rc > 0) {
|
||||||
#endif /* _WIN32 */
|
|
||||||
m_OutputStream.write(buffer, rc);
|
m_OutputStream.write(buffer, rc);
|
||||||
#ifdef _WIN32
|
|
||||||
return true;
|
|
||||||
#else /* _WIN32 */
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
#endif /* _WIN32 */
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
#endif /* _WIN32 */
|
||||||
}
|
}
|
||||||
|
|
||||||
String output = m_OutputStream.str();
|
String output = m_OutputStream.str();
|
||||||
|
@ -68,6 +68,7 @@ public:
|
|||||||
static const std::deque<Process::Ptr>::size_type MaxTasksPerThread = 512;
|
static const std::deque<Process::Ptr>::size_type MaxTasksPerThread = 512;
|
||||||
|
|
||||||
Process(const Arguments& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr());
|
Process(const Arguments& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr());
|
||||||
|
~Process(void);
|
||||||
|
|
||||||
void SetTimeout(double timeout);
|
void SetTimeout(double timeout);
|
||||||
double GetTimeout(void) const;
|
double GetTimeout(void) const;
|
||||||
@ -93,6 +94,13 @@ private:
|
|||||||
pid_t m_PID;
|
pid_t m_PID;
|
||||||
ConsoleHandle m_FD;
|
ConsoleHandle m_FD;
|
||||||
|
|
||||||
|
#ifdef _WIN32
|
||||||
|
bool m_ReadPending;
|
||||||
|
bool m_ReadFailed;
|
||||||
|
OVERLAPPED m_Overlapped;
|
||||||
|
char m_ReadBuffer[1024];
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
|
||||||
std::ostringstream m_OutputStream;
|
std::ostringstream m_OutputStream;
|
||||||
boost::function<void (const ProcessResult&)> m_Callback;
|
boost::function<void (const ProcessResult&)> m_Callback;
|
||||||
ProcessResult m_Result;
|
ProcessResult m_Result;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user