mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-10-26 17:14:25 +01:00
This change updates the PyKMIP object hierarchy's read/write method signatures to support propagation of the KMIP version. The introduction of KMIP 2.0 introduces future KMIP message encodings that break backwards compatibility; to support this, PyKMIP must know what KMIP version is being used when encoding or decoding an object; the KMIP version residing in the client or server alone is now insufficient. Prior versions of KMIP, namely 1.0 - 1.4, have been backwards compatible, obviating the need for the KMIP version at encode/decode time. Going forward, this is no longer true. The PyKMIP client and server have been updated to include the KMIP version when making calls to read/write, as have the associated test cases covering this functionality.
891 lines
32 KiB
Python
891 lines
32 KiB
Python
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
|
|
import datetime
|
|
import mock
|
|
import socket
|
|
import testtools
|
|
import time
|
|
|
|
from kmip.core import enums
|
|
from kmip.core import exceptions
|
|
from kmip.core import objects
|
|
from kmip.core import utils
|
|
|
|
from kmip.core.messages import contents
|
|
from kmip.core.messages import messages
|
|
|
|
from kmip.services.server import engine
|
|
from kmip.services.server import session
|
|
|
|
|
|
def build_certificate(
|
|
common_names,
|
|
include_extension=True,
|
|
bad_extension=False
|
|
):
|
|
"""
|
|
Programmatically generate a self-signed certificate for testing purposes.
|
|
|
|
Args:
|
|
common_names (list): A list of strings for the common names of the
|
|
cert.
|
|
include_extension (boolean): A flag enabling/disabling the inclusion
|
|
of certificate extensions.
|
|
bad_extension (boolean): A flag enabling/disabling the setting of
|
|
invalid certificate extension values.
|
|
|
|
Returns:
|
|
x509.Certificate: The newly generated certificate object.
|
|
"""
|
|
names = []
|
|
for common_name in common_names:
|
|
names.append(
|
|
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)
|
|
)
|
|
name = x509.Name(names)
|
|
|
|
t = datetime.datetime.now()
|
|
delta = datetime.timedelta(days=30)
|
|
not_valid_before = t - delta
|
|
not_valid_after = t + delta
|
|
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
backend=default_backend()
|
|
)
|
|
|
|
builder = x509.CertificateBuilder().serial_number(
|
|
1
|
|
).issuer_name(
|
|
name
|
|
).subject_name(
|
|
name
|
|
).not_valid_before(
|
|
not_valid_before
|
|
).not_valid_after(
|
|
not_valid_after
|
|
).public_key(
|
|
private_key.public_key()
|
|
)\
|
|
|
|
extended_key_usage_values = []
|
|
if bad_extension:
|
|
extended_key_usage_values.append(
|
|
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
|
|
)
|
|
else:
|
|
extended_key_usage_values.append(
|
|
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
|
|
)
|
|
|
|
if include_extension:
|
|
builder = builder.add_extension(
|
|
x509.ExtendedKeyUsage(extended_key_usage_values),
|
|
True
|
|
)
|
|
|
|
return builder.sign(private_key, hashes.SHA256(), default_backend())
|
|
|
|
|
|
class TestKmipSession(testtools.TestCase):
|
|
"""
|
|
A test suite for the KmipSession.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(TestKmipSession, self).setUp()
|
|
|
|
def tearDown(self):
|
|
super(TestKmipSession, self).tearDown()
|
|
|
|
def test_init(self):
|
|
"""
|
|
Test that a KmipSession can be created without errors.
|
|
"""
|
|
session.KmipSession(None, None, None, 'name')
|
|
|
|
def test_init_without_name(self):
|
|
"""
|
|
Test that a KmipSession without 'name' can be created without errors.
|
|
"""
|
|
session.KmipSession(None, None, None, None)
|
|
|
|
def test_run(self):
|
|
"""
|
|
Test that the message handling loop is handled properly on normal
|
|
execution.
|
|
"""
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._handle_message_loop = mock.MagicMock(
|
|
side_effect=[
|
|
None,
|
|
exceptions.ConnectionClosed()
|
|
]
|
|
)
|
|
kmip_session._connection = mock.MagicMock()
|
|
|
|
kmip_session.run()
|
|
|
|
kmip_session._logger.info.assert_any_call("Starting session: name")
|
|
self.assertTrue(kmip_session._handle_message_loop.called)
|
|
kmip_session._connection.shutdown.assert_called_once_with(
|
|
socket.SHUT_RDWR
|
|
)
|
|
kmip_session._connection.close.assert_called_once_with()
|
|
kmip_session._logger.info.assert_called_with("Stopping session: name")
|
|
|
|
def test_run_with_failure(self):
|
|
"""
|
|
Test that the correct logging and error handling occurs when the
|
|
thread encounters an error with the message handling loop.
|
|
"""
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
|
|
test_exception = Exception("test")
|
|
kmip_session._handle_message_loop = mock.MagicMock(
|
|
side_effect=[
|
|
test_exception,
|
|
exceptions.ConnectionClosed()
|
|
]
|
|
)
|
|
|
|
kmip_session.run()
|
|
|
|
kmip_session._logger.info.assert_any_call("Starting session: name")
|
|
self.assertTrue(kmip_session._handle_message_loop.called)
|
|
kmip_session._logger.info.assert_any_call(
|
|
"Failure handling message loop"
|
|
)
|
|
kmip_session._logger.exception.assert_called_once_with(test_exception)
|
|
kmip_session._connection.shutdown.assert_called_once_with(
|
|
socket.SHUT_RDWR
|
|
)
|
|
kmip_session._connection.close.assert_called_once_with()
|
|
kmip_session._logger.info.assert_called_with("Stopping session: name")
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop(self, request_mock, cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs during the
|
|
message handling loop.
|
|
"""
|
|
data = utils.BytearrayStream()
|
|
|
|
# Build a response and use it as a dummy processing result.
|
|
batch_item = messages.ResponseBatchItem(
|
|
result_status=contents.ResultStatus(
|
|
enums.ResultStatus.SUCCESS
|
|
),
|
|
result_reason=contents.ResultReason(
|
|
enums.ResultReason.OBJECT_ARCHIVED
|
|
),
|
|
result_message=contents.ResultMessage("Test message.")
|
|
)
|
|
batch_items = [batch_item]
|
|
header = messages.ResponseHeader(
|
|
protocol_version=contents.ProtocolVersion(1, 0),
|
|
time_stamp=contents.TimeStamp(int(time.time())),
|
|
batch_count=contents.BatchCount(len(batch_items))
|
|
)
|
|
message = messages.ResponseMessage(
|
|
response_header=header,
|
|
batch_items=batch_items
|
|
)
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=False
|
|
)
|
|
kmip_session._engine = mock.MagicMock()
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session.authenticate.return_value = (
|
|
'test',
|
|
['group A', 'group B']
|
|
)
|
|
kmip_session._engine.process_request = mock.MagicMock(
|
|
return_value=(
|
|
message,
|
|
kmip_session._max_response_size,
|
|
contents.ProtocolVersion(1, 2)
|
|
)
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._connection.shared_ciphers = mock.MagicMock(
|
|
return_value=[
|
|
('AES128-SHA256', 'TLSv1/SSLv3', 128),
|
|
('AES256-SHA256', 'TLSv1/SSLv3', 256)
|
|
]
|
|
)
|
|
kmip_session._connection.cipher = mock.MagicMock(
|
|
return_value=('AES128-SHA256', 'TLSv1/SSLv3', 128)
|
|
)
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
kmip_session.authenticate = mock.MagicMock(
|
|
return_value=("John Doe", ["Group A"])
|
|
)
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.info.assert_any_call(
|
|
"Session client identity: John Doe"
|
|
)
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"Possible session ciphers: 2"
|
|
)
|
|
kmip_session._logger.debug.assert_any_call(
|
|
('AES128-SHA256', 'TLSv1/SSLv3', 128)
|
|
)
|
|
kmip_session._logger.debug.assert_any_call(
|
|
('AES256-SHA256', 'TLSv1/SSLv3', 256)
|
|
)
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"Session cipher selected: {0}".format(
|
|
('AES128-SHA256', 'TLSv1/SSLv3', 128)
|
|
)
|
|
)
|
|
kmip_session._logger.warning.assert_not_called()
|
|
kmip_session._logger.exception.assert_not_called()
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage.read',
|
|
mock.MagicMock(side_effect=Exception()))
|
|
def test_handle_message_loop_with_parse_failure(self, cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs during the
|
|
message handling loop.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=False
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session.authenticate.return_value = (
|
|
'test',
|
|
['group A', 'group B']
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.warning.assert_called_once_with(
|
|
"Failure parsing request message."
|
|
)
|
|
self.assertTrue(kmip_session._logger.exception.called)
|
|
kmip_session._logger.error.assert_not_called()
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_with_response_too_long(self,
|
|
request_mock,
|
|
cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs during the
|
|
message handling loop.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=False
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session.authenticate.return_value = (
|
|
'test',
|
|
['group A', 'group B']
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
kmip_session._max_response_size = 0
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
self.assertTrue(kmip_session._logger.warning.called)
|
|
kmip_session._logger.exception.assert_not_called()
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_with_unexpected_error(self,
|
|
request_mock,
|
|
cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs when an
|
|
unexpected error is generated while processing a request.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=False
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session.authenticate.return_value = (
|
|
'test',
|
|
['group A', 'group B']
|
|
)
|
|
kmip_session._engine = mock.MagicMock()
|
|
test_exception = Exception("Unexpected error.")
|
|
kmip_session._engine.process_request = mock.MagicMock(
|
|
side_effect=test_exception
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.warning.assert_called_once_with(
|
|
"An unexpected error occurred while processing request."
|
|
)
|
|
kmip_session._logger.exception.assert_called_once_with(test_exception)
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_with_authentication_failure(self,
|
|
request_mock,
|
|
cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs when an
|
|
authentication error is generated while processing a request.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=False
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session.authenticate.side_effect = exceptions.PermissionDenied(
|
|
"Authentication failed."
|
|
)
|
|
kmip_session._engine = mock.MagicMock()
|
|
kmip_session._engine.default_protocol_version = \
|
|
kmip_engine.default_protocol_version
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
fake_version = contents.ProtocolVersion(1, 2)
|
|
fake_credential = objects.Credential(
|
|
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
|
|
credential_value=objects.UsernamePasswordCredential(
|
|
username="John Doe",
|
|
password="secret"
|
|
)
|
|
)
|
|
fake_header = messages.RequestHeader(
|
|
protocol_version=fake_version,
|
|
authentication=contents.Authentication(
|
|
credentials=[fake_credential]
|
|
)
|
|
)
|
|
fake_request = messages.RequestMessage()
|
|
fake_request.request_header = fake_header
|
|
fake_request.read = mock.MagicMock()
|
|
request_mock.return_value = fake_request
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
fake_request.read.assert_called_once_with(
|
|
data,
|
|
kmip_version=enums.KMIPVersion.KMIP_1_2
|
|
)
|
|
kmip_session.authenticate.assert_called_once_with(
|
|
"test_certificate",
|
|
fake_request
|
|
)
|
|
kmip_session._logger.warning.assert_called_once_with(
|
|
"Authentication failed."
|
|
)
|
|
kmip_session._engine.build_error_response.assert_called_once_with(
|
|
fake_version,
|
|
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
|
|
"An error occurred during client authentication. "
|
|
"See server logs for more information."
|
|
)
|
|
kmip_session._logger.exception.assert_not_called()
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_no_certificate(self,
|
|
request_mock,
|
|
cert_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs when no
|
|
certificate is encountered while processing a request.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = None
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=True
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session._engine = mock.MagicMock()
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.warning(
|
|
"Failure verifying the client certificate."
|
|
)
|
|
kmip_session._logger.exception.assert_called_once_with(
|
|
exceptions.PermissionDenied(
|
|
"The client certificate could not be loaded from the session "
|
|
"connection."
|
|
)
|
|
)
|
|
kmip_session._engine.build_error_response.assert_called_once_with(
|
|
contents.ProtocolVersion(1, 0),
|
|
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
|
|
"Error verifying the client certificate. "
|
|
"See server logs for more information."
|
|
)
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch(
|
|
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
|
|
)
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_no_certificate_extension(self,
|
|
request_mock,
|
|
cert_mock,
|
|
ext_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs when an
|
|
invalid certificate is encountered while processing a request.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
ext_mock.return_value = None
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=True
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session._engine = mock.MagicMock()
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.warning(
|
|
"Failure verifying the client certificate."
|
|
)
|
|
kmip_session._logger.exception.assert_called_once_with(
|
|
exceptions.PermissionDenied(
|
|
"The extended key usage extension is missing from the client "
|
|
"certificate."
|
|
)
|
|
)
|
|
kmip_session._engine.build_error_response.assert_called_once_with(
|
|
contents.ProtocolVersion(1, 0),
|
|
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
|
|
"Error verifying the client certificate. "
|
|
"See server logs for more information."
|
|
)
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch(
|
|
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
|
|
)
|
|
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
|
|
@mock.patch('kmip.core.messages.messages.RequestMessage')
|
|
def test_handle_message_loop_invalid_certificate_extension(self,
|
|
request_mock,
|
|
cert_mock,
|
|
ext_mock):
|
|
"""
|
|
Test that the correct logging and error handling occurs when an
|
|
invalid certificate is encountered while processing a request.
|
|
"""
|
|
data = utils.BytearrayStream(())
|
|
|
|
cert_mock.return_value = 'test_certificate'
|
|
ext_mock.return_value = []
|
|
kmip_engine = engine.KmipEngine()
|
|
kmip_engine._logger = mock.MagicMock()
|
|
kmip_session = session.KmipSession(
|
|
kmip_engine,
|
|
None,
|
|
None,
|
|
name='name',
|
|
enable_tls_client_auth=True
|
|
)
|
|
kmip_session.authenticate = mock.MagicMock()
|
|
kmip_session._engine = mock.MagicMock()
|
|
kmip_session._logger = mock.MagicMock()
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._receive_request = mock.MagicMock(return_value=data)
|
|
kmip_session._send_response = mock.MagicMock()
|
|
|
|
kmip_session._handle_message_loop()
|
|
|
|
kmip_session._receive_request.assert_called_once_with()
|
|
kmip_session._logger.warning(
|
|
"Failure verifying the client certificate."
|
|
)
|
|
kmip_session._logger.exception.assert_called_once_with(
|
|
exceptions.PermissionDenied(
|
|
"The extended key usage extension is not marked for client "
|
|
"authentication in the client certificate."
|
|
)
|
|
)
|
|
kmip_session._engine.build_error_response.assert_called_once_with(
|
|
contents.ProtocolVersion(1, 0),
|
|
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
|
|
"Error verifying the client certificate. "
|
|
"See server logs for more information."
|
|
)
|
|
self.assertTrue(kmip_session._send_response.called)
|
|
|
|
@mock.patch(
|
|
"kmip.services.server.auth.get_client_identity_from_certificate"
|
|
)
|
|
def test_authenticate(self, mock_get):
|
|
"""
|
|
Test that the session correctly uses the authentication plugin
|
|
framework to authenticate new connections.
|
|
"""
|
|
mock_get.return_value = "John Doe"
|
|
kmip_session = session.KmipSession(
|
|
None,
|
|
None,
|
|
None,
|
|
name='TestSession'
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
fake_request = messages.RequestMessage(
|
|
request_header=messages.RequestHeader()
|
|
)
|
|
|
|
session_identity = kmip_session.authenticate(
|
|
"fake_certificate",
|
|
fake_request
|
|
)
|
|
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"No authentication plugins are enabled. The client identity will "
|
|
"be extracted from the client certificate."
|
|
)
|
|
mock_get.assert_any_call("fake_certificate")
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"Extraction succeeded for client identity: John Doe"
|
|
)
|
|
self.assertEqual(("John Doe", None), session_identity)
|
|
|
|
@mock.patch("kmip.services.server.auth.SLUGSConnector")
|
|
def test_authenticate_against_slugs(self, mock_connector):
|
|
"""
|
|
Test that the session correctly handles authentication with SLUGS.
|
|
"""
|
|
mock_instance = mock.MagicMock()
|
|
mock_instance.authenticate.return_value = ("John Doe", ["Group A"])
|
|
mock_connector.return_value = mock_instance
|
|
kmip_session = session.KmipSession(
|
|
None,
|
|
None,
|
|
("127.0.0.1", 48026),
|
|
name='TestSession',
|
|
auth_settings=[(
|
|
"auth:slugs",
|
|
{"enabled": "True", "url": "test_url"}
|
|
)]
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
fake_credential = objects.Credential(
|
|
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
|
|
credential_value=objects.UsernamePasswordCredential(
|
|
username="John Doe",
|
|
password="secret"
|
|
)
|
|
)
|
|
fake_request = messages.RequestMessage(
|
|
request_header=messages.RequestHeader(
|
|
authentication=contents.Authentication(
|
|
credentials=[fake_credential]
|
|
)
|
|
)
|
|
)
|
|
|
|
result = kmip_session.authenticate(
|
|
"fake_certificate",
|
|
fake_request
|
|
)
|
|
|
|
mock_connector.assert_any_call("test_url")
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"Authenticating with plugin: auth:slugs"
|
|
)
|
|
mock_instance.authenticate.assert_any_call(
|
|
"fake_certificate",
|
|
(("127.0.0.1", 48026), kmip_session._session_time),
|
|
fake_request.request_header.authentication.credentials
|
|
)
|
|
kmip_session._logger.debug(
|
|
"Authentication succeeded for client identity: John Doe"
|
|
)
|
|
self.assertEqual(2, len(result))
|
|
self.assertEqual("John Doe", result[0])
|
|
self.assertEqual(["Group A"], result[1])
|
|
|
|
@mock.patch("kmip.services.server.auth.SLUGSConnector")
|
|
def test_authenticate_against_slugs_with_failure(self, mock_connector):
|
|
"""
|
|
Test that the session correctly handles a SLUGS authentication error.
|
|
"""
|
|
mock_instance = mock.MagicMock()
|
|
test_exception = exceptions.PermissionDenied(
|
|
"Unrecognized user ID: John Doe"
|
|
)
|
|
mock_instance.authenticate.side_effect = test_exception
|
|
mock_connector.return_value = mock_instance
|
|
kmip_session = session.KmipSession(
|
|
None,
|
|
None,
|
|
("127.0.0.1", 48026),
|
|
name='TestSession',
|
|
auth_settings=[(
|
|
"auth:slugs",
|
|
{"enabled": "True", "url": "test_url"}
|
|
)]
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
fake_credential = objects.Credential(
|
|
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
|
|
credential_value=objects.UsernamePasswordCredential(
|
|
username="John Doe",
|
|
password="secret"
|
|
)
|
|
)
|
|
fake_request = messages.RequestMessage(
|
|
request_header=messages.RequestHeader(
|
|
authentication=contents.Authentication(
|
|
credentials=[fake_credential]
|
|
)
|
|
)
|
|
)
|
|
|
|
args = ("fake_certificate", fake_request)
|
|
self.assertRaisesRegex(
|
|
exceptions.PermissionDenied,
|
|
"Authentication failed.",
|
|
kmip_session.authenticate,
|
|
*args
|
|
)
|
|
|
|
mock_connector.assert_any_call("test_url")
|
|
kmip_session._logger.debug.assert_any_call(
|
|
"Authenticating with plugin: auth:slugs"
|
|
)
|
|
kmip_session._logger.warning.assert_any_call("Authentication failed.")
|
|
kmip_session._logger.exception.assert_any_call(test_exception)
|
|
|
|
def test_authenticate_against_unrecognized_plugin(self):
|
|
"""
|
|
Test that the session correctly handles an unrecognized plugin
|
|
configuration.
|
|
"""
|
|
kmip_session = session.KmipSession(
|
|
None,
|
|
None,
|
|
None,
|
|
name='TestSession',
|
|
auth_settings=[("auth:unrecognized", {})]
|
|
)
|
|
kmip_session._logger = mock.MagicMock()
|
|
fake_request = messages.RequestMessage(
|
|
request_header=messages.RequestHeader()
|
|
)
|
|
|
|
args = ("fake_certificate", fake_request)
|
|
self.assertRaisesRegex(
|
|
exceptions.PermissionDenied,
|
|
"Authentication failed.",
|
|
kmip_session.authenticate,
|
|
*args
|
|
)
|
|
|
|
kmip_session._logger.warning.assert_any_call(
|
|
"Authentication plugin 'auth:unrecognized' is not supported."
|
|
)
|
|
|
|
def test_receive_request(self):
|
|
"""
|
|
Test that the session can correctly receive and parse a message
|
|
encoding.
|
|
"""
|
|
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
expected = utils.BytearrayStream((content))
|
|
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._receive_bytes = mock.MagicMock(
|
|
side_effect=[content, b'']
|
|
)
|
|
|
|
observed = kmip_session._receive_request()
|
|
|
|
kmip_session._receive_bytes.assert_any_call(8)
|
|
kmip_session._receive_bytes.assert_any_call(0)
|
|
|
|
self.assertEqual(expected.buffer, observed.buffer)
|
|
|
|
def test_receive_bytes(self):
|
|
"""
|
|
Test that the session can receive a message.
|
|
"""
|
|
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._connection.recv = mock.MagicMock(
|
|
side_effect=[content, content]
|
|
)
|
|
|
|
observed = kmip_session._receive_bytes(16)
|
|
|
|
kmip_session._connection.recv.assert_any_call(16)
|
|
kmip_session._connection.recv.assert_called_with(8)
|
|
self.assertEqual(content + content, observed)
|
|
|
|
kmip_session._connection.recv = mock.MagicMock(
|
|
side_effect=['']
|
|
)
|
|
|
|
args = (8, )
|
|
self.assertRaises(
|
|
exceptions.ConnectionClosed,
|
|
kmip_session._receive_bytes,
|
|
*args
|
|
)
|
|
|
|
def test_receive_bytes_with_bad_length(self):
|
|
"""
|
|
Test that the session generates an error on an incorrectly sized
|
|
message.
|
|
"""
|
|
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._connection = mock.MagicMock()
|
|
kmip_session._connection.recv = mock.MagicMock(
|
|
side_effect=[content, content, None]
|
|
)
|
|
|
|
args = [32]
|
|
self.assertRaises(ValueError, kmip_session._receive_bytes, *args)
|
|
|
|
kmip_session._connection.recv.assert_any_call(16)
|
|
kmip_session._connection.recv.assert_called_with(16)
|
|
|
|
def test_send_message(self):
|
|
"""
|
|
Test that a data buffer, regardless of length, is sent correctly.
|
|
"""
|
|
buffer_full = utils.BytearrayStream((
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
))
|
|
buffer_empty = utils.BytearrayStream()
|
|
|
|
kmip_session = session.KmipSession(None, None, None, 'name')
|
|
kmip_session._connection = mock.MagicMock()
|
|
|
|
kmip_session._send_response(buffer_empty.buffer)
|
|
kmip_session._connection.sendall.assert_not_called()
|
|
|
|
kmip_session._send_response(buffer_full.buffer)
|
|
kmip_session._connection.sendall.assert_called_once_with(
|
|
bytes(buffer_full.buffer)
|
|
)
|