Fixes #1955 - Handle unexpected errors, but don't ignore background threads.

Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
Daniel Nephin 2015-09-09 14:38:46 -04:00 committed by Daniel Nephin
parent b25a300895
commit 61415cd8bc
2 changed files with 19 additions and 13 deletions

View File

@ -21,7 +21,6 @@ def parallel_execute(objects, obj_callable, msg_index, msg):
""" """
stream = get_output_stream(sys.stdout) stream = get_output_stream(sys.stdout)
lines = [] lines = []
errors = {}
for obj in objects: for obj in objects:
write_out_msg(stream, lines, msg_index(obj), msg) write_out_msg(stream, lines, msg_index(obj), msg)
@ -29,16 +28,17 @@ def parallel_execute(objects, obj_callable, msg_index, msg):
q = Queue() q = Queue()
def inner_execute_function(an_callable, parameter, msg_index): def inner_execute_function(an_callable, parameter, msg_index):
error = None
try: try:
result = an_callable(parameter) result = an_callable(parameter)
except APIError as e: except APIError as e:
errors[msg_index] = e.explanation error = e.explanation
result = "error" result = "error"
except Exception as e: except Exception as e:
errors[msg_index] = e error = e
result = 'unexpected_exception' result = 'unexpected_exception'
q.put((msg_index, result)) q.put((msg_index, result, error))
for an_object in objects: for an_object in objects:
t = Thread( t = Thread(
@ -49,15 +49,17 @@ def parallel_execute(objects, obj_callable, msg_index, msg):
t.start() t.start()
done = 0 done = 0
errors = {}
total_to_execute = len(objects) total_to_execute = len(objects)
while done < total_to_execute: while done < total_to_execute:
try: try:
msg_index, result = q.get(timeout=1) msg_index, result, error = q.get(timeout=1)
if result == 'unexpected_exception': if result == 'unexpected_exception':
raise errors[msg_index] errors[msg_index] = result, error
if result == 'error': if result == 'error':
errors[msg_index] = result, error
write_out_msg(stream, lines, msg_index, msg, status='error') write_out_msg(stream, lines, msg_index, msg, status='error')
else: else:
write_out_msg(stream, lines, msg_index, msg) write_out_msg(stream, lines, msg_index, msg)
@ -65,10 +67,14 @@ def parallel_execute(objects, obj_callable, msg_index, msg):
except Empty: except Empty:
pass pass
if errors: if not errors:
return
stream.write("\n") stream.write("\n")
for error in errors: for msg_index, (result, error) in errors.items():
stream.write("ERROR: for {} {} \n".format(error, errors[error])) stream.write("ERROR: for {} {} \n".format(msg_index, error))
if result == 'unexpected_exception':
raise error
def get_output_stream(stream): def get_output_stream(stream):

View File

@ -638,8 +638,7 @@ class ServiceTest(DockerClientTestCase):
self.assertTrue(service.containers()[0].is_running) self.assertTrue(service.containers()[0].is_running)
self.assertIn("ERROR: for 2 Boom", mock_stdout.getvalue()) self.assertIn("ERROR: for 2 Boom", mock_stdout.getvalue())
@mock.patch('sys.stdout', new_callable=StringIO) def test_scale_with_api_returns_unexpected_exception(self):
def test_scale_with_api_returns_unexpected_exception(self, mock_stdout):
""" """
Test that when scaling if the API returns an error, that is not of type Test that when scaling if the API returns an error, that is not of type
APIError, that error is re-raised. APIError, that error is re-raised.
@ -650,7 +649,8 @@ class ServiceTest(DockerClientTestCase):
with mock.patch( with mock.patch(
'compose.container.Container.create', 'compose.container.Container.create',
side_effect=ValueError("BOOM")): side_effect=ValueError("BOOM")
):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
service.scale(3) service.scale(3)