mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #123 from OpenKMIP/feat/add-kmip-session
Adding KmipSession
This commit is contained in:
commit
52c7103681
|
@ -0,0 +1,176 @@
|
|||
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
||||
from kmip.core.messages import messages
|
||||
from kmip.core import utils
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core.messages import contents
|
||||
|
||||
|
||||
class KmipSession(threading.Thread):
|
||||
"""
|
||||
A session thread representing a single KMIP client/server interaction.
|
||||
"""
|
||||
|
||||
def __init__(self, engine, connection, name=None):
|
||||
"""
|
||||
Create a KmipSession.
|
||||
|
||||
Args:
|
||||
engine (KmipEngine): A reference to the central server application
|
||||
that handles message processing. Required.
|
||||
connection (socket): A client socket.socket TLS connection
|
||||
representing a new KMIP connection. Required.
|
||||
name (str): The name of the KmipSession. Optional, defaults to
|
||||
None.
|
||||
"""
|
||||
super(KmipSession, self).__init__(
|
||||
group=None,
|
||||
target=None,
|
||||
name=name,
|
||||
args=(),
|
||||
kwargs={}
|
||||
)
|
||||
|
||||
self._logger = logging.getLogger('.'.join((__name__, name)))
|
||||
|
||||
self._engine = engine
|
||||
self._connection = connection
|
||||
|
||||
self._max_buffer_size = 4096
|
||||
self._max_request_size = 1048576
|
||||
self._max_response_size = 1048576
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
The main thread routine executed by invoking thread.start.
|
||||
|
||||
This method manages the new client connection, running a message
|
||||
handling loop. Once this method completes, the thread is finished.
|
||||
"""
|
||||
self._logger.info("Starting session: {0}".format(self.name))
|
||||
|
||||
try:
|
||||
self._handle_message_loop()
|
||||
except Exception as e:
|
||||
self._logger.info("Failure handling message loop")
|
||||
self._logger.exception(e)
|
||||
finally:
|
||||
self._connection.shutdown(socket.SHUT_RDWR)
|
||||
self._connection.close()
|
||||
self._logger.info("Stopping session: {0}".format(self.name))
|
||||
|
||||
def _handle_message_loop(self):
|
||||
request_data = self._receive_request()
|
||||
request = messages.RequestMessage()
|
||||
|
||||
try:
|
||||
request.read(request_data)
|
||||
except Exception as e:
|
||||
self._logger.info("Failure parsing request message.")
|
||||
self._logger.exception(e)
|
||||
response = self._build_error_response(
|
||||
enums.ResultStatus.OPERATION_FAILED,
|
||||
enums.ResultReason.INVALID_MESSAGE,
|
||||
"Error parsing request message. See server logs for more "
|
||||
"information.")
|
||||
else:
|
||||
# TODO (peterhamilton): Replace this with a KmipEngine call.
|
||||
response = self._build_error_response(
|
||||
enums.ResultStatus.OPERATION_FAILED,
|
||||
enums.ResultReason.INVALID_MESSAGE,
|
||||
"Default response. No operations supported."
|
||||
)
|
||||
|
||||
response_data = utils.BytearrayStream()
|
||||
response.write(response_data)
|
||||
|
||||
if len(response_data) > self._max_response_size:
|
||||
self._logger.error(
|
||||
"Response message length too large: "
|
||||
"{0} bytes, max {1} bytes".format(
|
||||
len(response_data),
|
||||
self._max_response_size
|
||||
)
|
||||
)
|
||||
response = self._build_error_response(
|
||||
enums.ResultStatus.OPERATION_FAILED,
|
||||
enums.ResultReason.RESPONSE_TOO_LARGE,
|
||||
"Response message length too large. See server logs for "
|
||||
"more information.")
|
||||
|
||||
self._send_response(response_data.buffer)
|
||||
|
||||
def _build_error_response(self, status, reason, message):
|
||||
"""
|
||||
TODO (peterhamilton): Move this into the KmipEngine.
|
||||
"""
|
||||
header = messages.ResponseHeader(
|
||||
protocol_version=contents.ProtocolVersion.create(1, 1),
|
||||
time_stamp=contents.TimeStamp(int(time.time())),
|
||||
batch_count=contents.BatchCount(1))
|
||||
batch_item = messages.ResponseBatchItem(
|
||||
result_status=contents.ResultStatus(status),
|
||||
result_reason=contents.ResultReason(reason),
|
||||
result_message=contents.ResultMessage(message)
|
||||
)
|
||||
response = messages.ResponseMessage(
|
||||
response_header=header,
|
||||
batch_items=[batch_item]
|
||||
)
|
||||
return response
|
||||
|
||||
def _receive_request(self):
|
||||
header = self._receive_bytes(8)
|
||||
message_size = struct.unpack('!I', header[4:])[0]
|
||||
|
||||
payload = self._receive_bytes(message_size)
|
||||
data = utils.BytearrayStream(header + payload)
|
||||
|
||||
return data
|
||||
|
||||
def _receive_bytes(self, message_size):
|
||||
bytes_received = 0
|
||||
message = b''
|
||||
|
||||
while bytes_received < message_size:
|
||||
partial_message = self._connection.recv(
|
||||
min(message_size - bytes_received, self._max_buffer_size)
|
||||
)
|
||||
|
||||
if partial_message is None:
|
||||
break
|
||||
else:
|
||||
bytes_received += len(partial_message)
|
||||
message += partial_message
|
||||
|
||||
if bytes_received != message_size:
|
||||
raise ValueError(
|
||||
"Invalid KMIP message received. Actual message length "
|
||||
"does not match the advertised header length."
|
||||
)
|
||||
else:
|
||||
return message
|
||||
|
||||
def _send_response(self, data):
|
||||
if len(data) > 0:
|
||||
self._connection.sendall(bytes(data))
|
|
@ -0,0 +1,248 @@
|
|||
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import socket
|
||||
import testtools
|
||||
|
||||
from kmip.core import utils
|
||||
from kmip.services.server import session
|
||||
|
||||
|
||||
class TestKmipSession(testtools.TestCase):
|
||||
"""
|
||||
A test suite for the KmipSession.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestKmipSession, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestKmipSession, self).tearDown()
|
||||
|
||||
def test_init(self):
|
||||
"""
|
||||
Test that a KmipSession can be created without errors.
|
||||
"""
|
||||
session.KmipSession(None, None, 'name')
|
||||
|
||||
def test_run(self):
|
||||
"""
|
||||
Test that the message handling loop is handled properly on normal
|
||||
execution.
|
||||
"""
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._logger = mock.MagicMock()
|
||||
kmip_session._handle_message_loop = mock.MagicMock()
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
|
||||
kmip_session.run()
|
||||
|
||||
kmip_session._logger.info.assert_any_call("Starting session: name")
|
||||
kmip_session._handle_message_loop.assert_called_once_with()
|
||||
kmip_session._connection.shutdown.assert_called_once_with(
|
||||
socket.SHUT_RDWR
|
||||
)
|
||||
kmip_session._connection.close.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_called_with("Stopping session: name")
|
||||
|
||||
def test_run_with_failure(self):
|
||||
"""
|
||||
Test that the correct logging and error handling occurs when the
|
||||
thread encounters an error with the message handling loop.
|
||||
"""
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._logger = mock.MagicMock()
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
|
||||
test_exception = Exception("test")
|
||||
kmip_session._handle_message_loop = mock.MagicMock(
|
||||
side_effect=test_exception
|
||||
)
|
||||
|
||||
kmip_session.run()
|
||||
|
||||
kmip_session._logger.info.assert_any_call("Starting session: name")
|
||||
kmip_session._handle_message_loop.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_any_call(
|
||||
"Failure handling message loop"
|
||||
)
|
||||
kmip_session._logger.exception.assert_called_once_with(test_exception)
|
||||
kmip_session._connection.shutdown.assert_called_once_with(
|
||||
socket.SHUT_RDWR
|
||||
)
|
||||
kmip_session._connection.close.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_called_with("Stopping session: name")
|
||||
|
||||
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
||||
def test_handle_message_loop(self, request_mock):
|
||||
"""
|
||||
Test that the correct logging and error handling occurs during the
|
||||
message handling loop.
|
||||
"""
|
||||
data = utils.BytearrayStream(())
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._logger = mock.MagicMock()
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
||||
kmip_session._send_response = mock.MagicMock()
|
||||
|
||||
kmip_session._handle_message_loop()
|
||||
|
||||
kmip_session._receive_request.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_not_called()
|
||||
kmip_session._logger.error.assert_not_called()
|
||||
kmip_session._logger.exception.assert_not_called()
|
||||
self.assertTrue(kmip_session._send_response.called)
|
||||
|
||||
@mock.patch('kmip.core.messages.messages.RequestMessage.read',
|
||||
mock.MagicMock(side_effect=Exception()))
|
||||
def test_handle_message_loop_with_parse_failure(self):
|
||||
"""
|
||||
Test that the correct logging and error handling occurs during the
|
||||
message handling loop.
|
||||
"""
|
||||
data = utils.BytearrayStream(())
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._logger = mock.MagicMock()
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
||||
kmip_session._send_response = mock.MagicMock()
|
||||
|
||||
kmip_session._handle_message_loop()
|
||||
|
||||
kmip_session._receive_request.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_called_once_with(
|
||||
"Failure parsing request message."
|
||||
)
|
||||
self.assertTrue(kmip_session._logger.exception.called)
|
||||
kmip_session._logger.error.assert_not_called()
|
||||
self.assertTrue(kmip_session._send_response.called)
|
||||
|
||||
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
||||
def test_handle_message_loop_with_response_too_long(self, request_mock):
|
||||
"""
|
||||
Test that the correct logging and error handling occurs during the
|
||||
message handling loop.
|
||||
"""
|
||||
data = utils.BytearrayStream(())
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._logger = mock.MagicMock()
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
||||
kmip_session._send_response = mock.MagicMock()
|
||||
kmip_session._max_response_size = 0
|
||||
|
||||
kmip_session._handle_message_loop()
|
||||
|
||||
kmip_session._receive_request.assert_called_once_with()
|
||||
kmip_session._logger.info.assert_not_called()
|
||||
self.assertTrue(kmip_session._logger.error.called)
|
||||
kmip_session._logger.exception.assert_not_called()
|
||||
self.assertTrue(kmip_session._send_response.called)
|
||||
|
||||
def test_build_error_response(self):
|
||||
"""
|
||||
Test that a default error response can be built correctly.
|
||||
|
||||
TODO (peterhamilton): Remove this test when the KmipEngine is added.
|
||||
"""
|
||||
from kmip.core import enums
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._build_error_response(
|
||||
enums.ResultStatus.OPERATION_FAILED,
|
||||
enums.ResultReason.GENERAL_FAILURE,
|
||||
"General failure message."
|
||||
)
|
||||
|
||||
def test_receive_request(self):
|
||||
"""
|
||||
Test that the session can correctly receive and parse a message
|
||||
encoding.
|
||||
"""
|
||||
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
expected = utils.BytearrayStream((content))
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._receive_bytes = mock.MagicMock(
|
||||
side_effect=[content, b'']
|
||||
)
|
||||
|
||||
observed = kmip_session._receive_request()
|
||||
|
||||
kmip_session._receive_bytes.assert_any_call(8)
|
||||
kmip_session._receive_bytes.assert_any_call(0)
|
||||
|
||||
self.assertEqual(expected.buffer, observed.buffer)
|
||||
|
||||
def test_receive_bytes(self):
|
||||
"""
|
||||
Test that the session can receive a message.
|
||||
"""
|
||||
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
kmip_session._connection.recv = mock.MagicMock(
|
||||
side_effect=[content, content]
|
||||
)
|
||||
|
||||
observed = kmip_session._receive_bytes(16)
|
||||
|
||||
kmip_session._connection.recv.assert_any_call(16)
|
||||
kmip_session._connection.recv.assert_called_with(8)
|
||||
self.assertEqual(content + content, observed)
|
||||
|
||||
def test_receive_bytes_with_bad_length(self):
|
||||
"""
|
||||
Test that the session generates an error on an incorrectly sized
|
||||
message.
|
||||
"""
|
||||
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
kmip_session._connection.recv = mock.MagicMock(
|
||||
side_effect=[content, content, None]
|
||||
)
|
||||
|
||||
args = [32]
|
||||
self.assertRaises(ValueError, kmip_session._receive_bytes, *args)
|
||||
|
||||
kmip_session._connection.recv.assert_any_call(16)
|
||||
kmip_session._connection.recv.assert_called_with(16)
|
||||
|
||||
def test_send_message(self):
|
||||
"""
|
||||
Test that a data buffer, regardless of length, is sent correctly.
|
||||
"""
|
||||
buffer_full = utils.BytearrayStream((
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
))
|
||||
buffer_empty = utils.BytearrayStream()
|
||||
|
||||
kmip_session = session.KmipSession(None, None, 'name')
|
||||
kmip_session._connection = mock.MagicMock()
|
||||
|
||||
kmip_session._send_response(buffer_empty.buffer)
|
||||
kmip_session._connection.sendall.assert_not_called()
|
||||
|
||||
kmip_session._send_response(buffer_full.buffer)
|
||||
kmip_session._connection.sendall.assert_called_once_with(
|
||||
bytes(buffer_full.buffer)
|
||||
)
|
Loading…
Reference in New Issue