Adding session extraction of client identity from certificates

This change updates the KmipSession, allowing it to extract client
identity from the client certificate of a TLS connection. The
certificate subject common name is used as the client identity if
the certificate has client authentication set in the extended key
usage extension.

This change breaks backwards compatibility. If a client certificate
does not define a client identity, the session will reject it and
shutdown the connection. Any client certificates used to connect
with the software server in the past will need to be replaced with
certificates that define a suitable client identity.
This commit is contained in:
Peter Hamilton 2016-09-13 14:48:37 -04:00
parent 40919468a6
commit 8fd6349152
2 changed files with 284 additions and 1 deletions

View File

@ -18,6 +18,9 @@ import socket
import struct import struct
import threading import threading
from cryptography import x509
from cryptography.hazmat import backends
from kmip.core import enums from kmip.core import enums
from kmip.core import exceptions from kmip.core import exceptions
from kmip.core.messages import contents from kmip.core.messages import contents
@ -83,6 +86,59 @@ class KmipSession(threading.Thread):
self._connection.close() self._connection.close()
self._logger.info("Stopping session: {0}".format(self.name)) self._logger.info("Stopping session: {0}".format(self.name))
def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True)
try:
certificate = x509.load_der_x509_certificate(
certificate_data,
backends.default_backend()
)
except Exception:
# This should never get raised "in theory," as the ssl socket
# should fail to connect non-TLS connections before the session
# gets created. This is a failsafe in case that protection fails.
raise exceptions.PermissionDenied(
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity."
)
try:
extended_key_usage = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value
except x509.ExtensionNotFound:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
client_identities = certificate.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable."
)
def _handle_message_loop(self): def _handle_message_loop(self):
request_data = self._receive_request() request_data = self._receive_request()
request = messages.RequestMessage() request = messages.RequestMessage()
@ -90,6 +146,7 @@ class KmipSession(threading.Thread):
max_size = self._max_response_size max_size = self._max_response_size
try: try:
client_identity = self._get_client_identity()
request.read(request_data) request.read(request_data)
except Exception as e: except Exception as e:
self._logger.warning("Failure parsing request message.") self._logger.warning("Failure parsing request message.")
@ -103,7 +160,8 @@ class KmipSession(threading.Thread):
else: else:
try: try:
response, max_response_size = self._engine.process_request( response, max_response_size = self._engine.process_request(
request request,
client_identity
) )
if max_response_size: if max_response_size:
max_size = max_response_size max_size = max_response_size

View File

@ -13,6 +13,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
import mock import mock
import socket import socket
import testtools import testtools
@ -29,6 +37,76 @@ from kmip.services.server import engine
from kmip.services.server import session from kmip.services.server import session
def build_certificate(
common_names,
include_extension=True,
bad_extension=False
):
"""
Programmatically generate a self-signed certificate for testing purposes.
Args:
common_names (list): A list of strings for the common names of the
cert.
include_extension (boolean): A flag enabling/disabling the inclusion
of certificate extensions.
bad_extension (boolean): A flag enabling/disabling the setting of
invalid certificate extension values.
Returns:
x509.Certificate: The newly generated certificate object.
"""
names = []
for common_name in common_names:
names.append(
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)
)
name = x509.Name(names)
t = datetime.datetime.now()
delta = datetime.timedelta(days=30)
not_valid_before = t - delta
not_valid_after = t + delta
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
builder = x509.CertificateBuilder().serial_number(
1
).issuer_name(
name
).subject_name(
name
).not_valid_before(
not_valid_before
).not_valid_after(
not_valid_after
).public_key(
private_key.public_key()
)\
extended_key_usage_values = []
if bad_extension:
extended_key_usage_values.append(
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
)
else:
extended_key_usage_values.append(
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
)
if include_extension:
builder = builder.add_extension(
x509.ExtendedKeyUsage(extended_key_usage_values),
True
)
return builder.sign(private_key, hashes.SHA256(), default_backend())
class TestKmipSession(testtools.TestCase): class TestKmipSession(testtools.TestCase):
""" """
A test suite for the KmipSession. A test suite for the KmipSession.
@ -108,6 +186,145 @@ class TestKmipSession(testtools.TestCase):
kmip_session._connection.close.assert_called_once_with() kmip_session._connection.close.assert_called_once_with()
kmip_session._logger.info.assert_called_with("Stopping session: name") kmip_session._logger.info.assert_called_with("Stopping session: name")
def test_get_client_identity(self):
"""
Test that a client identity is obtained from a valid client
certificate.
"""
client_certificate = build_certificate([u'Test Identity'])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity"
)
kmip_session._logger.warning.assert_not_called()
def test_get_client_identity_with_no_certificate(self):
"""
Test that the right error is generated when no certificate is
available to provide the client identity.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = None
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
is missing its extended key usage extension.
"""
client_certificate = build_certificate([u'Test Identity'], False)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_common_name(self):
"""
Test that the right error is generated when the client certificate
does not define a subject common name.
"""
client_certificate = build_certificate([])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The client certificate does not define a subject common "
"name. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_multiple_common_names(self):
"""
Test that the right client identity is returned when the client
certificate has multiple subject common names.
"""
client_certificate = build_certificate([
u'Test Identity 1',
u'Test Identity 2'
])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity 1', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity 1"
)
kmip_session._logger.warning.assert_called_once_with(
"Multiple client identities found. Using the first one processed."
)
def test_get_client_identity_with_incorrect_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
does not have client authentication set in its extended key usage
extension.
"""
client_certificate = build_certificate(
[u'Test Identity'],
bad_extension=True
)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable.",
kmip_session._get_client_identity
)
@mock.patch('kmip.core.messages.messages.RequestMessage') @mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop(self, request_mock): def test_handle_message_loop(self, request_mock):
""" """
@ -141,6 +358,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine._logger = mock.MagicMock() kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name') kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._engine = mock.MagicMock() kmip_session._engine = mock.MagicMock()
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._engine.process_request = mock.MagicMock( kmip_session._engine.process_request = mock.MagicMock(
return_value=(message, kmip_session._max_response_size) return_value=(message, kmip_session._max_response_size)
) )
@ -168,6 +387,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine() kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name') kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._logger = mock.MagicMock() kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock() kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data) kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -193,6 +414,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine() kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name') kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._logger = mock.MagicMock() kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock() kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data) kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -218,6 +441,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine() kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock() kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name') kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._engine = mock.MagicMock() kmip_session._engine = mock.MagicMock()
test_exception = Exception("Unexpected error.") test_exception = Exception("Unexpected error.")
kmip_session._engine.process_request = mock.MagicMock( kmip_session._engine.process_request = mock.MagicMock(