Allow More Child Processes (#284)

- Added wait_for_multiple_objects_enhanced() function that can logically handle wait-any on many children (system resources permitting).
- Added unit tests for wait_for_multiple_objects_enhanced().
- Incorporated use of wait_for_multiple_objects_enhanced() into main signal handler.
- Upped max child processes to 500.
This commit is contained in:
Bryan Berns 2018-03-14 14:10:27 -04:00 committed by Manoj Ampalam
parent fb1342f7e0
commit 0dc5a971bd
10 changed files with 429 additions and 11 deletions

View File

@ -60,6 +60,9 @@
<ClCompile Include="$(OpenSSH-Src-Path)regress\unittests\win32compat\string_tests.c">
<ExcludedFromBuild Condition="$(UseOpenSSL)=='false'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="$(OpenSSH-Src-Path)regress\unittests\win32compat\signal_tests.c">
<ExcludedFromBuild Condition="$(UseOpenSSL)=='false'">true</ExcludedFromBuild>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(OpenSSH-Src-Path)regress\unittests\win32compat\tests.h" />

View File

@ -293,6 +293,7 @@
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\tnnet.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\utf.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\spawn.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\signal_wait.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\w32fd.h" />

View File

@ -9,6 +9,7 @@
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\misc.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\signal_sigalrm.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\signal_sigchld.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\signal_wait.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\w32log.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\pwd.c" />
<ClCompile Include="$(OpenSSH-Src-Path)\contrib\win32\win32compat\win32_dirent.c" />

View File

@ -282,18 +282,18 @@ wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
debug5("wait() on %d events and %d children", num_events, live_children);
/* TODO - implement signal catching and handling */
if (num_all_events) {
DWORD ret = WaitForMultipleObjectsEx(num_all_events, all_events, FALSE, milli_seconds, TRUE);
if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_all_events - 1)) {
DWORD ret = wait_for_multiple_objects_enhanced(num_all_events, all_events, milli_seconds, TRUE);
if ((ret >= WAIT_OBJECT_0_ENHANCED) && (ret <= WAIT_OBJECT_0_ENHANCED + num_all_events - 1)) {
/* woken up by event signalled
* is this due to a child process going down
*/
if (live_children && ((ret - WAIT_OBJECT_0) < live_children)) {
if (live_children && ((ret - WAIT_OBJECT_0_ENHANCED) < live_children)) {
sigaddset(&pending_signals, W32_SIGCHLD);
sw_child_to_zombie(ret - WAIT_OBJECT_0);
sw_child_to_zombie(ret - WAIT_OBJECT_0_ENHANCED);
}
} else if (ret == WAIT_IO_COMPLETION) {
} else if (ret == WAIT_IO_COMPLETION_ENHANCED) {
/* APC processed due to IO or signal*/
} else if (ret == WAIT_TIMEOUT) {
} else if (ret == WAIT_TIMEOUT_ENHANCED) {
/* timed out */
return 0;
} else { /* some other error*/

View File

@ -1,7 +1,8 @@
#include <Windows.h>
/* child processes */
#define MAX_CHILDREN 50
#define MAX_CHILDREN 500
struct _children {
/*
* array of handles and process_ids.
@ -30,3 +31,12 @@ struct _timer_info {
__int64 run_time_sec; /* time in seconds, timer is set to go off from ticks_at_start */
};
int sw_init_timer();
#define WAIT_OBJECT_0_ENHANCED 0x00000000
#define WAIT_ABANDONED_0_ENHANCED 0x10000000
#define WAIT_TIMEOUT_ENHANCED 0x20000000
#define WAIT_IO_COMPLETION_ENHANCED 0x30000000
#define WAIT_FAILED_ENHANCED WAIT_FAILED
DWORD wait_for_multiple_objects_enhanced(_In_ DWORD nCount, _In_ const HANDLE *lpHandles,
_In_ DWORD dwMilliseconds, _In_ BOOL bAlertable);

View File

@ -222,9 +222,9 @@ waitpid(int pid, int *status, int options)
timeout = INFINITE;
if (options & WNOHANG)
timeout = 0;
ret = WaitForMultipleObjects(children.num_children, children.handles, FALSE, timeout);
if ((ret >= WAIT_OBJECT_0) && (ret < (WAIT_OBJECT_0 + children.num_children))) {
index = ret - WAIT_OBJECT_0;
ret = wait_for_multiple_objects_enhanced(children.num_children, children.handles, timeout, FALSE);
if ((ret >= WAIT_OBJECT_0_ENHANCED) && (ret < (WAIT_OBJECT_0_ENHANCED + children.num_children))) {
index = ret - WAIT_OBJECT_0_ENHANCED;
process = children.handles[index];
ret_id = children.process_id[index];
GetExitCodeProcess(process, &exit_code);
@ -233,7 +233,7 @@ waitpid(int pid, int *status, int options)
if (status)
*status = exit_code;
return ret_id;
} else if (ret == WAIT_TIMEOUT) {
} else if (ret == WAIT_TIMEOUT_ENHANCED) {
/* TODO - assert that WNOHANG was specified*/
return 0;
}

View File

@ -0,0 +1,226 @@
/*
* Author: Bryan Berns <berns@uwalumni.com>
*
* Partial replacement for WaitForMultipleObjectsEx that handles more than 64
* objects. This is tuned for OpenSSH use in (no need for 'wait-all' scenarios).
* This is only safe to use for objects whose transitional state is not
* automatically lost just by calling a WaitForMultipleObjects* or
* WaitForSingleObjects*.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met :
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and / or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES(INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "signal_internal.h"
typedef struct _wait_for_multiple_objects_struct {
/* synchronization management */
HANDLE thread_handle;
HANDLE wait_event;
/* native function parameters input and output */
DWORD num_handles;
const HANDLE * handles;
DWORD return_value;
}
wait_for_multiple_objects_struct;
DWORD WINAPI
wait_for_multiple_objects_thread(LPVOID lpParam)
{
wait_for_multiple_objects_struct *waitstruct =
(wait_for_multiple_objects_struct *) lpParam;
/* wait for bin to complete -- this is alertable for our interrupt cleanup routine */
waitstruct->return_value = WaitForMultipleObjectsEx(waitstruct->num_handles,
waitstruct->handles, FALSE, INFINITE, TRUE);
/* notify the main thread that an event was found */
SetEvent(waitstruct->wait_event);
return TRUE;
}
VOID CALLBACK
wait_for_multiple_objects_interrupter(_In_ ULONG_PTR dwParam)
{
/* we must explicitly exit the thread since the thread could have been received
* the alert prior to the thread running in which case it is acknowledged when
* the threads starts running instead of when it is waiting at
* WaitForMultipleObjectsEx */
ExitThread(0);
}
DWORD
wait_for_multiple_objects_enhanced(_In_ DWORD nCount, _In_ const HANDLE *lpHandles,
_In_ DWORD dwMilliseconds, _In_ BOOL bAlertable)
{
/* number of separate bins / threads required to monitor execution */
const DWORD bin_size = MAXIMUM_WAIT_OBJECTS;
const DWORD bins_total = (nCount - 1) / bin_size + 1;
DWORD return_value = WAIT_FAILED_ENHANCED;
HANDLE wait_event = NULL;
wait_for_multiple_objects_struct *wait_bins = NULL;
DWORD wait_ret;
/* if less than the normal maximum then just use the built-in function
* to avoid the overhead of another thread */
if (nCount <= MAXIMUM_WAIT_OBJECTS) {
DWORD wait_ret = WaitForMultipleObjectsEx(nCount, lpHandles,
FALSE, dwMilliseconds, bAlertable);
if (wait_ret == WAIT_IO_COMPLETION) return WAIT_IO_COMPLETION_ENHANCED;
if (wait_ret == WAIT_TIMEOUT) return WAIT_TIMEOUT_ENHANCED;
/* translate normal offset to enhanced offset for abandoned threads */
if (wait_ret >= WAIT_ABANDONED_0 &&
wait_ret < WAIT_ABANDONED_0 + MAXIMUM_WAIT_OBJECTS) {
return WAIT_ABANDONED_0_ENHANCED + (wait_ret - WAIT_ABANDONED_0);
}
/* translate normal offset to enhanced offset for signaled threads */
if (wait_ret >= WAIT_OBJECT_0 &&
wait_ret < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS) {
return WAIT_OBJECT_0_ENHANCED + (wait_ret - WAIT_OBJECT_0);
}
return WAIT_FAILED_ENHANCED;
}
/* setup synchronization event to flag when the main thread should wake up */
wait_event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (wait_event == NULL) {
goto cleanup;
}
/* allocate an area to communicate with our threads */
wait_bins = (wait_for_multiple_objects_struct *)
calloc(bins_total, sizeof(wait_for_multiple_objects_struct));
if (wait_bins == NULL) {
goto cleanup;
}
ZeroMemory(wait_bins, bins_total * sizeof(wait_for_multiple_objects_struct));
/* initialize each thread that handles up to MAXIMUM_WAIT_OBJECTS each */
for (DWORD bin = 0; bin < bins_total; bin++) {
const int handles_processed = bin * bin_size;
wait_bins[bin].return_value = WAIT_FAILED - 1;
wait_bins[bin].wait_event = wait_event;
wait_bins[bin].handles = &(lpHandles[handles_processed]);
wait_bins[bin].num_handles = min(nCount - handles_processed, bin_size);
/* create a thread for this bin */
if ((wait_bins[bin].thread_handle = CreateThread(NULL, 2048,
wait_for_multiple_objects_thread,
(LPVOID) &(wait_bins[bin]), 0, NULL)) == NULL) {
goto cleanup;
}
}
/* wait for at least one thread to return; this will indicate that return
* value will have been set in our bin array */
wait_ret = WaitForSingleObjectEx(wait_event, dwMilliseconds, bAlertable);
/* if io alert just skip to end */
if (wait_ret == WAIT_IO_COMPLETION) {
return_value = WAIT_IO_COMPLETION_ENHANCED;
goto cleanup;
}
/* if timeout just skip to end */
if (wait_ret == WAIT_TIMEOUT) {
return_value = WAIT_TIMEOUT_ENHANCED;
goto cleanup;
}
/* unexpected output result */
if (wait_ret != WAIT_OBJECT_0) {
return_value = WAIT_FAILED_ENHANCED;
goto cleanup;
}
/* only looking for one object events */
for (DWORD bin = 0; bin < bins_total; bin++) {
/* skip bins that have have the default, unprocessed status */
if (wait_bins[bin].return_value == WAIT_FAILED - 1)
continue;
/* return failure if a bin has been processed but returned an
* invalid or unexpected status */
if (wait_bins[bin].return_value == WAIT_FAILED ||
wait_bins[bin].return_value == WAIT_IO_COMPLETION ||
wait_bins[bin].return_value == WAIT_TIMEOUT)
{
return_value = WAIT_FAILED_ENHANCED;
break;
}
/* translate normal offset to enhanced offset for abandoned threads */
if (wait_bins[bin].return_value >= WAIT_ABANDONED_0 &&
wait_bins[bin].return_value < WAIT_ABANDONED_0 + wait_bins[bin].num_handles) {
return_value = WAIT_ABANDONED_0_ENHANCED +
bin * bin_size + (wait_bins[bin].return_value - WAIT_ABANDONED_0);
break;
}
/* translate normal offset to enhanced offset for signaled threads */
if (wait_bins[bin].return_value >= WAIT_OBJECT_0 &&
wait_bins[bin].return_value < WAIT_OBJECT_0 + wait_bins[bin].num_handles) {
return_value = WAIT_OBJECT_0_ENHANCED +
bin * bin_size + (wait_bins[bin].return_value - WAIT_OBJECT_0);
break;
}
}
cleanup:
/* interrupt any outstanding threads */
for (DWORD bin = 0; bin < bins_total; bin++) {
if (wait_bins[bin].thread_handle != NULL) {
/* send each thread that is still waiting a signal to wake up;
* if the thread in not waiting and still has not fully
* finished executing then it will just ignore the signal */
if (wait_bins[bin].return_value == (WAIT_FAILED - 1)) {
QueueUserAPC(wait_for_multiple_objects_interrupter,
wait_bins[bin].thread_handle, (ULONG_PTR)NULL);
}
/* we must wait for these threads to complete so we can
* safely cleanup the shared resources */
WaitForSingleObject(wait_bins[bin].thread_handle, INFINITE);
CloseHandle(wait_bins[bin].thread_handle);
}
}
if (wait_event)
CloseHandle(wait_event);
if (wait_bins)
free(wait_bins);
return return_value;
}

View File

@ -0,0 +1,175 @@
/*
* Author: Bryan Berns <berns@uwalumni.com>
*/
#include "includes.h"
#include "signal_internal.h"
#include "../test_helper/test_helper.h"
#include "tests.h"
VOID CALLBACK
signal_test_dummy_apc(_In_ ULONG_PTR dwParam)
{
/* dummy */
}
DWORD WINAPI
signal_test_send_apc(LPVOID lpParam)
{
HANDLE thread = (HANDLE)lpParam;
Sleep(250);
QueueUserAPC(signal_test_dummy_apc, thread, (ULONG_PTR)NULL);
return TRUE;
}
DWORD WINAPI
signal_test_set_event(LPVOID lpParam)
{
HANDLE hevent = (HANDLE)lpParam;
Sleep(10);
SetEvent(hevent);
return TRUE;
}
DWORD WINAPI
signal_create_abandoned_object(LPVOID lpParam)
{
*((HANDLE *)lpParam) = CreateMutex(NULL, TRUE, 0);
return TRUE;
}
VOID TEST_RESOURCES(BOOL start)
{
static DWORD initial_count = 0;
if (start) GetProcessHandleCount(GetCurrentProcess(), &initial_count);
else {
DWORD final_count;
GetProcessHandleCount(GetCurrentProcess(), &final_count);
ASSERT_INT_EQ(initial_count, final_count);
}
}
void
signal_test_wait_for_multiple_objects()
{
/* shared test resources */
HANDLE current_thread = OpenThread(THREAD_ALL_ACCESS, FALSE, GetCurrentThreadId());
HANDLE current_process = GetCurrentProcess();
/* events for testing */
HANDLE hObjects[300];
const DWORD objects_size = ARRAYSIZE(hObjects);;
for (int i = 0; i < objects_size; i++)
hObjects[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
/* create abandoned mutex */
HANDLE abandoned_mutux = NULL;
HANDLE mutex_thread = CreateThread(NULL, 0, signal_create_abandoned_object, &abandoned_mutux, 0, NULL);
WaitForSingleObject(mutex_thread, INFINITE);
CloseHandle(mutex_thread);
{
TEST_START("Signal: APC wakeup with select event counts (WAIT_IO_COMPLETION_ENHANCED)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) ResetEvent(hObjects[i]);
for (int i = 0; i < objects_size; i++) {
DWORD select = i % MAXIMUM_WAIT_OBJECTS;
if (select == 0 || select == 1 || select == MAXIMUM_WAIT_OBJECTS - 1 || select == MAXIMUM_WAIT_OBJECTS - 2) {
CloseHandle(CreateThread(NULL, 0, signal_test_send_apc, current_thread, 0, NULL));
DWORD ret = wait_for_multiple_objects_enhanced(i + 1, hObjects, 10000, TRUE);
ASSERT_INT_EQ(ret, WAIT_IO_COMPLETION_ENHANCED);
}
}
TEST_RESOURCES(FALSE);
TEST_DONE();
}
{
TEST_START("Signal: Wait-any with one invalid event in positions 1-300 (WAIT_FAILED_ENHANCED)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) ResetEvent(hObjects[i]);
for (int i = 0; i < objects_size; i++) {
HANDLE event = hObjects[i];
hObjects[i] = NULL;
DWORD ret = wait_for_multiple_objects_enhanced(objects_size, hObjects, 10000, FALSE);
ASSERT_INT_EQ(ret, WAIT_FAILED_ENHANCED);
hObjects[i] = event;
}
TEST_RESOURCES(FALSE);
TEST_DONE();
}
{
TEST_START("Signal: Wait-any with signaled event in positions 1-300 (WAIT_OBJECT_0_ENHANCED)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) {
SetEvent(hObjects[i]);
DWORD ret = wait_for_multiple_objects_enhanced(i + 1, hObjects, 10000, FALSE);
ASSERT_INT_EQ(ret, i + WAIT_OBJECT_0_ENHANCED);
ResetEvent(hObjects[i]);
}
TEST_RESOURCES(FALSE);
TEST_DONE();
}
{
TEST_START("Signal: Wait-any with latent events (WAIT_TIMEOUT_ENHANCED)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) ResetEvent(hObjects[i]);
DWORD ret = wait_for_multiple_objects_enhanced(objects_size, hObjects, 250, FALSE);
ASSERT_INT_EQ(ret, WAIT_TIMEOUT_ENHANCED);
TEST_RESOURCES(FALSE);
TEST_DONE();
}
{
TEST_START("Signal: Wait-any with async event in positions 1-300 (WAIT_OBJECT_0_ENHANCED offset)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) ResetEvent(hObjects[i]);
for (int i = 0; i < objects_size; i++) {
CloseHandle(CreateThread(NULL, 0, signal_test_set_event, hObjects[i], 0, NULL));
DWORD ret = wait_for_multiple_objects_enhanced(objects_size, hObjects, 10000, FALSE);
ASSERT_INT_EQ(ret, i + WAIT_OBJECT_0_ENHANCED);
ResetEvent(hObjects[i]);
}
TEST_RESOURCES(FALSE);
TEST_DONE();
}
{
TEST_START("Signal: Wait-any with abandoned mutex in positions 1-300 (WAIT_ABANDONED_0_ENHANCED offset)");
TEST_RESOURCES(TRUE);
for (int i = 0; i < objects_size; i++) ResetEvent(hObjects[i]);
for (int i = 0; i < objects_size; i++) {
HANDLE original_event = hObjects[i];
hObjects[i] = abandoned_mutux;
DWORD ret = wait_for_multiple_objects_enhanced(objects_size, hObjects, 10000, FALSE);
ASSERT_INT_EQ(ret, i + WAIT_ABANDONED_0_ENHANCED);
hObjects[i] = original_event;
}
TEST_RESOURCES(FALSE);
TEST_DONE();
}
for (int i = 0; i < objects_size; i++) CloseHandle(hObjects[i]);
CloseHandle(current_thread);
}
void
signal_tests()
{
signal_test_wait_for_multiple_objects();
}

View File

@ -19,6 +19,7 @@ tests()
{
_set_abort_behavior(0, 1);
log_init(NULL, 7, 2, 0);
signal_tests();
socket_tests();
file_tests();
dir_tests();

View File

@ -1,4 +1,5 @@
#pragma once
void signal_tests();
void socket_tests();
void file_tests();
void miscellaneous_tests();