Merge pull request #407 from OpenKMIP/feat/update-session-auth

Update the server session to use the auth plugin framework
This commit is contained in:
Peter Hamilton 2018-03-30 09:38:58 -04:00 committed by GitHub
commit 6bda8ec999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 640 additions and 247 deletions

View File

@ -38,6 +38,25 @@ class KmipError(Exception):
self.status = status
self.reason = reason
def __eq__(self, other):
if isinstance(other, KmipError):
if str(self) != str(other):
return False
elif self.status != other.status:
return False
elif self.reason != other.reason:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, KmipError):
return self == other
else:
return NotImplemented
class CryptographicFailure(KmipError):
"""

View File

@ -335,9 +335,6 @@ class KmipEngine(object):
else:
auth_credentials = None
# TODO (peter-hamilton) This is a shim until SLUGS integration is done.
credential = [credential, None]
self._verify_credential(auth_credentials, credential)
# Process the batch error continuation option

View File

@ -398,10 +398,12 @@ class KmipServer(object):
s = session.KmipSession(
self._engine,
connection,
address,
name=session_name,
enable_tls_client_auth=self.config.settings.get(
'enable_tls_client_auth'
)
),
auth_settings=self.config.settings.get('auth_plugins')
)
s.daemon = True
s.start()

View File

@ -17,9 +17,9 @@ import logging
import socket
import struct
import threading
import time
from cryptography import x509
from cryptography.hazmat import backends
from kmip.core import enums
from kmip.core import exceptions
@ -27,6 +27,8 @@ from kmip.core.messages import contents
from kmip.core.messages import messages
from kmip.core import utils
from kmip.services.server import auth
class KmipSession(threading.Thread):
"""
@ -36,8 +38,10 @@ class KmipSession(threading.Thread):
def __init__(self,
engine,
connection,
address,
name=None,
enable_tls_client_auth=True):
enable_tls_client_auth=True,
auth_settings=None):
"""
Create a KmipSession.
@ -46,12 +50,19 @@ class KmipSession(threading.Thread):
that handles message processing. Required.
connection (socket): A client socket.socket TLS connection
representing a new KMIP connection. Required.
address (tuple): The address tuple produced with the session
connection. Contains the IP address and port number of the
remote connection endpoint. Required.
name (str): The name of the KmipSession. Optional, defaults to
None.
enable_tls_client_auth (bool): A flag that enables a strict check
for the client auth flag in the extended key usage extension
in client certificates when establishing the client/server TLS
connection. Optional, defaults to True.
auth_settings (list): A list of tuples, each containing (1) the
name of the 'auth:' settings block from the server config file,
and (2) a dictionary of configuration settings for a specific
authentication plugin. Optional, defaults to None.
"""
super(KmipSession, self).__init__(
group=None,
@ -67,9 +78,12 @@ class KmipSession(threading.Thread):
self._engine = engine
self._connection = connection
self._address = address
self._enable_tls_client_auth = enable_tls_client_auth
self._auth_settings = [] if auth_settings is None else auth_settings
self._session_time = time.time()
self._max_buffer_size = 4096
self._max_request_size = 1048576
self._max_response_size = 1048576
@ -96,61 +110,6 @@ class KmipSession(threading.Thread):
self._connection.close()
self._logger.info("Stopping session: {0}".format(self.name))
def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True)
try:
cert = x509.load_der_x509_certificate(
certificate_data,
backends.default_backend()
)
except Exception:
# This should never get raised "in theory," as the ssl socket
# should fail to connect non-TLS connections before the session
# gets created. This is a failsafe in case that protection fails.
raise exceptions.PermissionDenied(
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity."
)
if self._enable_tls_client_auth:
try:
extended_key_usage = cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value
except x509.ExtensionNotFound:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the "
"client certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in \
extended_key_usage:
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for "
"client authentication in the client certificate. Session "
"client identity unavailable."
)
client_identities = cert.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
def _handle_message_loop(self):
request_data = self._receive_request()
request = messages.RequestMessage()
@ -170,8 +129,41 @@ class KmipSession(threading.Thread):
self._connection.cipher()
)
)
client_identity = self._get_client_identity()
certificate = auth.get_certificate_from_connection(
self._connection
)
if certificate is None:
raise exceptions.PermissionDenied(
"The client certificate could not be loaded from the "
"session connection."
)
if self._enable_tls_client_auth:
extension = auth.get_extended_key_usage_from_certificate(
certificate
)
if extension is None:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from "
"the client certificate."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in extension:
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for "
"client authentication in the client certificate."
)
request.read(request_data)
except exceptions.PermissionDenied as e:
self._logger.warning("Failure verifying the client certificate.")
self._logger.exception(e)
response = self._engine.build_error_response(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
except Exception as e:
self._logger.warning("Failure parsing request message.")
self._logger.exception(e)
@ -183,29 +175,44 @@ class KmipSession(threading.Thread):
)
else:
try:
response, max_response_size = self._engine.process_request(
request,
client_identity
client_identity = self.authenticate(certificate, request)
self._logger.info(
"Session client identity: {}".format(client_identity[0])
)
if max_response_size:
max_size = max_response_size
except exceptions.KmipError as e:
except Exception:
self._logger.warning("Authentication failed.")
response = self._engine.build_error_response(
request.request_header.protocol_version,
e.reason,
str(e)
)
except Exception as e:
self._logger.warning(
"An unexpected error occurred while processing request."
)
self._logger.exception(e)
response = self._engine.build_error_response(
request.request_header.protocol_version,
enums.ResultReason.GENERAL_FAILURE,
"An unexpected error occurred while processing request. "
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"An error occurred during client authentication. "
"See server logs for more information."
)
else:
try:
response, max_response_size = self._engine.process_request(
request,
client_identity
)
if max_response_size:
max_size = max_response_size
except exceptions.KmipError as e:
response = self._engine.build_error_response(
request.request_header.protocol_version,
e.reason,
str(e)
)
except Exception as e:
self._logger.warning(
"An unexpected error occurred while processing "
"request."
)
self._logger.exception(e)
response = self._engine.build_error_response(
request.request_header.protocol_version,
enums.ResultReason.GENERAL_FAILURE,
"An unexpected error occurred while processing "
"request. See server logs for more information."
)
response_data = utils.BytearrayStream()
response.write(response_data)
@ -229,6 +236,68 @@ class KmipSession(threading.Thread):
self._send_response(response_data.buffer)
def authenticate(self, certificate, request):
credentials = []
if request.request_header.authentication is not None:
credentials = request.request_header.authentication.credentials
plugin_enabled = False
for auth_settings in self._auth_settings:
plugin_name, plugin_config = auth_settings
if plugin_name.startswith("auth:slugs"):
if plugin_config.get("enabled") == "True":
plugin_enabled = True
plugin = auth.SLUGSConnector(plugin_config.get("url"))
self._logger.debug(
"Authenticating with plugin: {}".format(plugin_name)
)
try:
client_identity = plugin.authenticate(
certificate,
(self._address, self._session_time),
credentials
)
except Exception as e:
self._logger.warning(
"Authentication failed."
)
self._logger.exception(e)
else:
self._logger.debug(
"Authentication succeeded for client identity: "
"{}".format(client_identity[0])
)
return client_identity
else:
self._logger.warning(
"Authentication plugin '{}' is not "
"supported.".format(plugin_name)
)
if not plugin_enabled:
self._logger.debug(
"No authentication plugins are enabled. The client identity "
"will be extracted from the client certificate."
)
try:
client_identity = auth.get_client_identity_from_certificate(
certificate
)
except Exception as e:
self._logger.warning("Client identity extraction failed.")
self._logger.exception(e)
else:
self._logger.debug(
"Extraction succeeded for client identity: {}".format(
client_identity
)
)
return tuple([client_identity, None])
raise exceptions.PermissionDenied("Authentication failed.")
def _receive_request(self):
header = self._receive_bytes(8)
message_size = struct.unpack('!I', header[4:])[0]

View File

@ -16,7 +16,6 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
@ -28,6 +27,7 @@ import time
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import objects
from kmip.core import utils
from kmip.core.messages import contents
@ -122,20 +122,20 @@ class TestKmipSession(testtools.TestCase):
"""
Test that a KmipSession can be created without errors.
"""
session.KmipSession(None, None, 'name')
session.KmipSession(None, None, None, 'name')
def test_init_without_name(self):
"""
Test that a KmipSession without 'name' can be created without errors.
"""
session.KmipSession(None, None, None)
session.KmipSession(None, None, None, None)
def test_run(self):
"""
Test that the message handling loop is handled properly on normal
execution.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._handle_message_loop = mock.MagicMock(
side_effect=[
@ -160,7 +160,7 @@ class TestKmipSession(testtools.TestCase):
Test that the correct logging and error handling occurs when the
thread encounters an error with the message handling loop.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
@ -186,147 +186,9 @@ class TestKmipSession(testtools.TestCase):
kmip_session._connection.close.assert_called_once_with()
kmip_session._logger.info.assert_called_with("Stopping session: name")
def test_get_client_identity(self):
"""
Test that a client identity is obtained from a valid client
certificate.
"""
client_certificate = build_certificate([u'Test Identity'])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity"
)
kmip_session._logger.warning.assert_not_called()
def test_get_client_identity_with_no_certificate(self):
"""
Test that the right error is generated when no certificate is
available to provide the client identity.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = None
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
is missing its extended key usage extension.
"""
client_certificate = build_certificate([u'Test Identity'], False)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_common_name(self):
"""
Test that the right error is generated when the client certificate
does not define a subject common name.
"""
client_certificate = build_certificate([])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The client certificate does not define a subject common "
"name. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_multiple_common_names(self):
"""
Test that the right client identity is returned when the client
certificate has multiple subject common names.
"""
client_certificate = build_certificate([
u'Test Identity 1',
u'Test Identity 2'
])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity 1', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity 1"
)
kmip_session._logger.warning.assert_called_once_with(
"Multiple client identities found. Using the first one processed."
)
def test_get_client_identity_with_incorrect_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
does not have client authentication set in its extended key usage
extension.
"""
client_certificate = build_certificate(
[u'Test Identity'],
bad_extension=True
)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable.",
kmip_session._get_client_identity
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop(self, request_mock):
def test_handle_message_loop(self, request_mock, cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
@ -354,12 +216,22 @@ class TestKmipSession(testtools.TestCase):
batch_items=batch_items
)
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session._engine = mock.MagicMock()
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._engine.process_request = mock.MagicMock(
return_value=(message, kmip_session._max_response_size)
)
@ -376,11 +248,16 @@ class TestKmipSession(testtools.TestCase):
)
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session.authenticate = mock.MagicMock(
return_value=("John Doe", ["Group A"])
)
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.info.assert_not_called()
kmip_session._logger.info.assert_any_call(
"Session client identity: John Doe"
)
kmip_session._logger.debug.assert_any_call(
"Possible session ciphers: 2"
)
@ -399,19 +276,30 @@ class TestKmipSession(testtools.TestCase):
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage.read',
mock.MagicMock(side_effect=Exception()))
def test_handle_message_loop_with_parse_failure(self):
def test_handle_message_loop_with_parse_failure(self, cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -427,18 +315,31 @@ class TestKmipSession(testtools.TestCase):
kmip_session._logger.error.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_response_too_long(self, request_mock):
def test_handle_message_loop_with_response_too_long(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -448,24 +349,36 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
# kmip_session._logger.info.assert_not_called()
self.assertTrue(kmip_session._logger.warning.called)
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_unexpected_error(self, request_mock):
def test_handle_message_loop_with_unexpected_error(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when an
unexpected error is generated while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._engine = mock.MagicMock()
test_exception = Exception("Unexpected error.")
kmip_session._engine.process_request = mock.MagicMock(
@ -479,13 +392,406 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
# kmip_session._logger.info.assert_not_called()
kmip_session._logger.warning.assert_called_once_with(
"An unexpected error occurred while processing request."
)
kmip_session._logger.exception.assert_called_once_with(test_exception)
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_authentication_failure(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when an
authentication error is generated while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.side_effect = exceptions.PermissionDenied(
"Authentication failed."
)
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
fake_version = contents.ProtocolVersion(1, 2)
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_header = messages.RequestHeader(
protocol_version=fake_version,
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
fake_request = messages.RequestMessage()
fake_request.request_header = fake_header
fake_request.read = mock.MagicMock()
request_mock.return_value = fake_request
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
fake_request.read.assert_called_once_with(data)
kmip_session.authenticate.assert_called_once_with(
"test_certificate",
fake_request
)
kmip_session._logger.warning.assert_called_once_with(
"Authentication failed."
)
kmip_session._engine.build_error_response.assert_called_once_with(
fake_version,
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"An error occurred during client authentication. "
"See server logs for more information."
)
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_no_certificate(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when no
certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = None
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The client certificate could not be loaded from the session "
"connection."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_no_certificate_extension(self,
request_mock,
cert_mock,
ext_mock):
"""
Test that the correct logging and error handling occurs when an
invalid certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
ext_mock.return_value = None
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_invalid_certificate_extension(self,
request_mock,
cert_mock,
ext_mock):
"""
Test that the correct logging and error handling occurs when an
invalid certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
ext_mock.return_value = []
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
"kmip.services.server.auth.get_client_identity_from_certificate"
)
def test_authenticate(self, mock_get):
"""
Test that the session correctly uses the authentication plugin
framework to authenticate new connections.
"""
mock_get.return_value = "John Doe"
kmip_session = session.KmipSession(
None,
None,
None,
name='TestSession'
)
kmip_session._logger = mock.MagicMock()
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader()
)
session_identity = kmip_session.authenticate(
"fake_certificate",
fake_request
)
kmip_session._logger.debug.assert_any_call(
"No authentication plugins are enabled. The client identity will "
"be extracted from the client certificate."
)
mock_get.assert_any_call("fake_certificate")
kmip_session._logger.debug.assert_any_call(
"Extraction succeeded for client identity: John Doe"
)
self.assertEqual(("John Doe", None), session_identity)
@mock.patch("kmip.services.server.auth.SLUGSConnector")
def test_authenticate_against_slugs(self, mock_connector):
"""
Test that the session correctly handles authentication with SLUGS.
"""
mock_instance = mock.MagicMock()
mock_instance.authenticate.return_value = ("John Doe", ["Group A"])
mock_connector.return_value = mock_instance
kmip_session = session.KmipSession(
None,
None,
("127.0.0.1", 48026),
name='TestSession',
auth_settings=[(
"auth:slugs",
{"enabled": "True", "url": "test_url"}
)]
)
kmip_session._logger = mock.MagicMock()
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader(
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
)
result = kmip_session.authenticate(
"fake_certificate",
fake_request
)
mock_connector.assert_any_call("test_url")
kmip_session._logger.debug.assert_any_call(
"Authenticating with plugin: auth:slugs"
)
mock_instance.authenticate.assert_any_call(
"fake_certificate",
(("127.0.0.1", 48026), kmip_session._session_time),
fake_request.request_header.authentication.credentials
)
kmip_session._logger.debug(
"Authentication succeeded for client identity: John Doe"
)
self.assertEqual(2, len(result))
self.assertEqual("John Doe", result[0])
self.assertEqual(["Group A"], result[1])
@mock.patch("kmip.services.server.auth.SLUGSConnector")
def test_authenticate_against_slugs_with_failure(self, mock_connector):
"""
Test that the session correctly handles a SLUGS authentication error.
"""
mock_instance = mock.MagicMock()
test_exception = exceptions.PermissionDenied(
"Unrecognized user ID: John Doe"
)
mock_instance.authenticate.side_effect = test_exception
mock_connector.return_value = mock_instance
kmip_session = session.KmipSession(
None,
None,
("127.0.0.1", 48026),
name='TestSession',
auth_settings=[(
"auth:slugs",
{"enabled": "True", "url": "test_url"}
)]
)
kmip_session._logger = mock.MagicMock()
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader(
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
)
args = ("fake_certificate", fake_request)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Authentication failed.",
kmip_session.authenticate,
*args
)
mock_connector.assert_any_call("test_url")
kmip_session._logger.debug.assert_any_call(
"Authenticating with plugin: auth:slugs"
)
kmip_session._logger.warning.assert_any_call("Authentication failed.")
kmip_session._logger.exception.assert_any_call(test_exception)
def test_authenticate_against_unrecognized_plugin(self):
"""
Test that the session correctly handles an unrecognized plugin
configuration.
"""
kmip_session = session.KmipSession(
None,
None,
None,
name='TestSession',
auth_settings=[("auth:unrecognized", {})]
)
kmip_session._logger = mock.MagicMock()
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader()
)
args = ("fake_certificate", fake_request)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Authentication failed.",
kmip_session.authenticate,
*args
)
kmip_session._logger.warning.assert_any_call(
"Authentication plugin 'auth:unrecognized' is not supported."
)
def test_receive_request(self):
"""
Test that the session can correctly receive and parse a message
@ -494,7 +800,7 @@ class TestKmipSession(testtools.TestCase):
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
expected = utils.BytearrayStream((content))
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._receive_bytes = mock.MagicMock(
side_effect=[content, b'']
)
@ -512,7 +818,7 @@ class TestKmipSession(testtools.TestCase):
"""
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._connection.recv = mock.MagicMock(
side_effect=[content, content]
@ -542,7 +848,7 @@ class TestKmipSession(testtools.TestCase):
"""
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._connection.recv = mock.MagicMock(
side_effect=[content, content, None]
@ -563,7 +869,7 @@ class TestKmipSession(testtools.TestCase):
))
buffer_empty = utils.BytearrayStream()
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._send_response(buffer_empty.buffer)