From d316d29b60e7ef54bea246067946599c94c9a362 Mon Sep 17 00:00:00 2001 From: Peter Date: Tue, 9 Feb 2016 18:01:58 -0500 Subject: [PATCH] Adding KmipSession This change adds a KmipSession class that manages individual client/server connections in a thread of execution separate from the main thread. A test suite is included. --- kmip/services/server/session.py | 176 +++++++++++++ .../unit/services/server/test_session.py | 248 ++++++++++++++++++ 2 files changed, 424 insertions(+) create mode 100644 kmip/services/server/session.py create mode 100644 kmip/tests/unit/services/server/test_session.py diff --git a/kmip/services/server/session.py b/kmip/services/server/session.py new file mode 100644 index 0000000..525c3c9 --- /dev/null +++ b/kmip/services/server/session.py @@ -0,0 +1,176 @@ +# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import socket +import struct +import threading +import time + +from kmip.core.messages import messages +from kmip.core import utils + +from kmip.core import enums +from kmip.core.messages import contents + + +class KmipSession(threading.Thread): + """ + A session thread representing a single KMIP client/server interaction. + """ + + def __init__(self, engine, connection, name=None): + """ + Create a KmipSession. + + Args: + engine (KmipEngine): A reference to the central server application + that handles message processing. Required. + connection (socket): A client socket.socket TLS connection + representing a new KMIP connection. Required. + name (str): The name of the KmipSession. Optional, defaults to + None. + """ + super(KmipSession, self).__init__( + group=None, + target=None, + name=name, + args=(), + kwargs={} + ) + + self._logger = logging.getLogger('.'.join((__name__, name))) + + self._engine = engine + self._connection = connection + + self._max_buffer_size = 4096 + self._max_request_size = 1048576 + self._max_response_size = 1048576 + + def run(self): + """ + The main thread routine executed by invoking thread.start. + + This method manages the new client connection, running a message + handling loop. Once this method completes, the thread is finished. + """ + self._logger.info("Starting session: {0}".format(self.name)) + + try: + self._handle_message_loop() + except Exception as e: + self._logger.info("Failure handling message loop") + self._logger.exception(e) + finally: + self._connection.shutdown(socket.SHUT_RDWR) + self._connection.close() + self._logger.info("Stopping session: {0}".format(self.name)) + + def _handle_message_loop(self): + request_data = self._receive_request() + request = messages.RequestMessage() + + try: + request.read(request_data) + except Exception as e: + self._logger.info("Failure parsing request message.") + self._logger.exception(e) + response = self._build_error_response( + enums.ResultStatus.OPERATION_FAILED, + enums.ResultReason.INVALID_MESSAGE, + "Error parsing request message. See server logs for more " + "information.") + else: + # TODO (peterhamilton): Replace this with a KmipEngine call. + response = self._build_error_response( + enums.ResultStatus.OPERATION_FAILED, + enums.ResultReason.INVALID_MESSAGE, + "Default response. No operations supported." + ) + + response_data = utils.BytearrayStream() + response.write(response_data) + + if len(response_data) > self._max_response_size: + self._logger.error( + "Response message length too large: " + "{0} bytes, max {1} bytes".format( + len(response_data), + self._max_response_size + ) + ) + response = self._build_error_response( + enums.ResultStatus.OPERATION_FAILED, + enums.ResultReason.RESPONSE_TOO_LARGE, + "Response message length too large. See server logs for " + "more information.") + + self._send_response(response_data.buffer) + + def _build_error_response(self, status, reason, message): + """ + TODO (peterhamilton): Move this into the KmipEngine. + """ + header = messages.ResponseHeader( + protocol_version=contents.ProtocolVersion.create(1, 1), + time_stamp=contents.TimeStamp(int(time.time())), + batch_count=contents.BatchCount(1)) + batch_item = messages.ResponseBatchItem( + result_status=contents.ResultStatus(status), + result_reason=contents.ResultReason(reason), + result_message=contents.ResultMessage(message) + ) + response = messages.ResponseMessage( + response_header=header, + batch_items=[batch_item] + ) + return response + + def _receive_request(self): + header = self._receive_bytes(8) + message_size = struct.unpack('!I', header[4:])[0] + + payload = self._receive_bytes(message_size) + data = utils.BytearrayStream(header + payload) + + return data + + def _receive_bytes(self, message_size): + bytes_received = 0 + message = b'' + + while bytes_received < message_size: + partial_message = self._connection.recv( + min(message_size - bytes_received, self._max_buffer_size) + ) + + if partial_message is None: + break + else: + bytes_received += len(partial_message) + message += partial_message + + if bytes_received != message_size: + raise ValueError( + "Invalid KMIP message received. Actual message length " + "does not match the advertised header length." + ) + else: + return message + + def _send_response(self, data): + if len(data) > 0: + self._connection.sendall(bytes(data)) diff --git a/kmip/tests/unit/services/server/test_session.py b/kmip/tests/unit/services/server/test_session.py new file mode 100644 index 0000000..e3150bb --- /dev/null +++ b/kmip/tests/unit/services/server/test_session.py @@ -0,0 +1,248 @@ +# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import socket +import testtools + +from kmip.core import utils +from kmip.services.server import session + + +class TestKmipSession(testtools.TestCase): + """ + A test suite for the KmipSession. + """ + + def setUp(self): + super(TestKmipSession, self).setUp() + + def tearDown(self): + super(TestKmipSession, self).tearDown() + + def test_init(self): + """ + Test that a KmipSession can be created without errors. + """ + session.KmipSession(None, None, 'name') + + def test_run(self): + """ + Test that the message handling loop is handled properly on normal + execution. + """ + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._logger = mock.MagicMock() + kmip_session._handle_message_loop = mock.MagicMock() + kmip_session._connection = mock.MagicMock() + + kmip_session.run() + + kmip_session._logger.info.assert_any_call("Starting session: name") + kmip_session._handle_message_loop.assert_called_once_with() + kmip_session._connection.shutdown.assert_called_once_with( + socket.SHUT_RDWR + ) + kmip_session._connection.close.assert_called_once_with() + kmip_session._logger.info.assert_called_with("Stopping session: name") + + def test_run_with_failure(self): + """ + Test that the correct logging and error handling occurs when the + thread encounters an error with the message handling loop. + """ + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._logger = mock.MagicMock() + kmip_session._connection = mock.MagicMock() + + test_exception = Exception("test") + kmip_session._handle_message_loop = mock.MagicMock( + side_effect=test_exception + ) + + kmip_session.run() + + kmip_session._logger.info.assert_any_call("Starting session: name") + kmip_session._handle_message_loop.assert_called_once_with() + kmip_session._logger.info.assert_any_call( + "Failure handling message loop" + ) + kmip_session._logger.exception.assert_called_once_with(test_exception) + kmip_session._connection.shutdown.assert_called_once_with( + socket.SHUT_RDWR + ) + kmip_session._connection.close.assert_called_once_with() + kmip_session._logger.info.assert_called_with("Stopping session: name") + + @mock.patch('kmip.core.messages.messages.RequestMessage') + def test_handle_message_loop(self, request_mock): + """ + Test that the correct logging and error handling occurs during the + message handling loop. + """ + data = utils.BytearrayStream(()) + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._logger = mock.MagicMock() + kmip_session._connection = mock.MagicMock() + kmip_session._receive_request = mock.MagicMock(return_value=data) + kmip_session._send_response = mock.MagicMock() + + kmip_session._handle_message_loop() + + kmip_session._receive_request.assert_called_once_with() + kmip_session._logger.info.assert_not_called() + kmip_session._logger.error.assert_not_called() + kmip_session._logger.exception.assert_not_called() + self.assertTrue(kmip_session._send_response.called) + + @mock.patch('kmip.core.messages.messages.RequestMessage.read', + mock.MagicMock(side_effect=Exception())) + def test_handle_message_loop_with_parse_failure(self): + """ + Test that the correct logging and error handling occurs during the + message handling loop. + """ + data = utils.BytearrayStream(()) + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._logger = mock.MagicMock() + kmip_session._connection = mock.MagicMock() + kmip_session._receive_request = mock.MagicMock(return_value=data) + kmip_session._send_response = mock.MagicMock() + + kmip_session._handle_message_loop() + + kmip_session._receive_request.assert_called_once_with() + kmip_session._logger.info.assert_called_once_with( + "Failure parsing request message." + ) + self.assertTrue(kmip_session._logger.exception.called) + kmip_session._logger.error.assert_not_called() + self.assertTrue(kmip_session._send_response.called) + + @mock.patch('kmip.core.messages.messages.RequestMessage') + def test_handle_message_loop_with_response_too_long(self, request_mock): + """ + Test that the correct logging and error handling occurs during the + message handling loop. + """ + data = utils.BytearrayStream(()) + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._logger = mock.MagicMock() + kmip_session._connection = mock.MagicMock() + kmip_session._receive_request = mock.MagicMock(return_value=data) + kmip_session._send_response = mock.MagicMock() + kmip_session._max_response_size = 0 + + kmip_session._handle_message_loop() + + kmip_session._receive_request.assert_called_once_with() + kmip_session._logger.info.assert_not_called() + self.assertTrue(kmip_session._logger.error.called) + kmip_session._logger.exception.assert_not_called() + self.assertTrue(kmip_session._send_response.called) + + def test_build_error_response(self): + """ + Test that a default error response can be built correctly. + + TODO (peterhamilton): Remove this test when the KmipEngine is added. + """ + from kmip.core import enums + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._build_error_response( + enums.ResultStatus.OPERATION_FAILED, + enums.ResultReason.GENERAL_FAILURE, + "General failure message." + ) + + def test_receive_request(self): + """ + Test that the session can correctly receive and parse a message + encoding. + """ + content = b'\x00\x00\x00\x00\x00\x00\x00\x00' + expected = utils.BytearrayStream((content)) + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._receive_bytes = mock.MagicMock( + side_effect=[content, b''] + ) + + observed = kmip_session._receive_request() + + kmip_session._receive_bytes.assert_any_call(8) + kmip_session._receive_bytes.assert_any_call(0) + + self.assertEqual(expected.buffer, observed.buffer) + + def test_receive_bytes(self): + """ + Test that the session can receive a message. + """ + content = b'\x00\x00\x00\x00\x00\x00\x00\x00' + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._connection = mock.MagicMock() + kmip_session._connection.recv = mock.MagicMock( + side_effect=[content, content] + ) + + observed = kmip_session._receive_bytes(16) + + kmip_session._connection.recv.assert_any_call(16) + kmip_session._connection.recv.assert_called_with(8) + self.assertEqual(content + content, observed) + + def test_receive_bytes_with_bad_length(self): + """ + Test that the session generates an error on an incorrectly sized + message. + """ + content = b'\x00\x00\x00\x00\x00\x00\x00\x00' + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._connection = mock.MagicMock() + kmip_session._connection.recv = mock.MagicMock( + side_effect=[content, content, None] + ) + + args = [32] + self.assertRaises(ValueError, kmip_session._receive_bytes, *args) + + kmip_session._connection.recv.assert_any_call(16) + kmip_session._connection.recv.assert_called_with(16) + + def test_send_message(self): + """ + Test that a data buffer, regardless of length, is sent correctly. + """ + buffer_full = utils.BytearrayStream(( + b'\x00\x00\x00\x00\x00\x00\x00\x00' + )) + buffer_empty = utils.BytearrayStream() + + kmip_session = session.KmipSession(None, None, 'name') + kmip_session._connection = mock.MagicMock() + + kmip_session._send_response(buffer_empty.buffer) + kmip_session._connection.sendall.assert_not_called() + + kmip_session._send_response(buffer_full.buffer) + kmip_session._connection.sendall.assert_called_once_with( + bytes(buffer_full.buffer) + )