Merge pull request #192 from OpenKMIP/feat/add-client-cert-auth

Adding session extraction of client identity from certificates
This commit is contained in:
Peter Hamilton 2016-10-04 14:55:14 -04:00 committed by GitHub
commit bcb5e7d948
2 changed files with 284 additions and 1 deletions

View File

@ -18,6 +18,9 @@ import socket
import struct
import threading
from cryptography import x509
from cryptography.hazmat import backends
from kmip.core import enums
from kmip.core import exceptions
from kmip.core.messages import contents
@ -83,6 +86,59 @@ class KmipSession(threading.Thread):
self._connection.close()
self._logger.info("Stopping session: {0}".format(self.name))
def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True)
try:
certificate = x509.load_der_x509_certificate(
certificate_data,
backends.default_backend()
)
except Exception:
# This should never get raised "in theory," as the ssl socket
# should fail to connect non-TLS connections before the session
# gets created. This is a failsafe in case that protection fails.
raise exceptions.PermissionDenied(
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity."
)
try:
extended_key_usage = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value
except x509.ExtensionNotFound:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
client_identities = certificate.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable."
)
def _handle_message_loop(self):
request_data = self._receive_request()
request = messages.RequestMessage()
@ -90,6 +146,7 @@ class KmipSession(threading.Thread):
max_size = self._max_response_size
try:
client_identity = self._get_client_identity()
request.read(request_data)
except Exception as e:
self._logger.warning("Failure parsing request message.")
@ -103,7 +160,8 @@ class KmipSession(threading.Thread):
else:
try:
response, max_response_size = self._engine.process_request(
request
request,
client_identity
)
if max_response_size:
max_size = max_response_size

View File

@ -13,6 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
import mock
import socket
import testtools
@ -29,6 +37,76 @@ from kmip.services.server import engine
from kmip.services.server import session
def build_certificate(
common_names,
include_extension=True,
bad_extension=False
):
"""
Programmatically generate a self-signed certificate for testing purposes.
Args:
common_names (list): A list of strings for the common names of the
cert.
include_extension (boolean): A flag enabling/disabling the inclusion
of certificate extensions.
bad_extension (boolean): A flag enabling/disabling the setting of
invalid certificate extension values.
Returns:
x509.Certificate: The newly generated certificate object.
"""
names = []
for common_name in common_names:
names.append(
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)
)
name = x509.Name(names)
t = datetime.datetime.now()
delta = datetime.timedelta(days=30)
not_valid_before = t - delta
not_valid_after = t + delta
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
builder = x509.CertificateBuilder().serial_number(
1
).issuer_name(
name
).subject_name(
name
).not_valid_before(
not_valid_before
).not_valid_after(
not_valid_after
).public_key(
private_key.public_key()
)\
extended_key_usage_values = []
if bad_extension:
extended_key_usage_values.append(
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
)
else:
extended_key_usage_values.append(
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
)
if include_extension:
builder = builder.add_extension(
x509.ExtendedKeyUsage(extended_key_usage_values),
True
)
return builder.sign(private_key, hashes.SHA256(), default_backend())
class TestKmipSession(testtools.TestCase):
"""
A test suite for the KmipSession.
@ -108,6 +186,145 @@ class TestKmipSession(testtools.TestCase):
kmip_session._connection.close.assert_called_once_with()
kmip_session._logger.info.assert_called_with("Stopping session: name")
def test_get_client_identity(self):
"""
Test that a client identity is obtained from a valid client
certificate.
"""
client_certificate = build_certificate([u'Test Identity'])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity"
)
kmip_session._logger.warning.assert_not_called()
def test_get_client_identity_with_no_certificate(self):
"""
Test that the right error is generated when no certificate is
available to provide the client identity.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = None
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
is missing its extended key usage extension.
"""
client_certificate = build_certificate([u'Test Identity'], False)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_common_name(self):
"""
Test that the right error is generated when the client certificate
does not define a subject common name.
"""
client_certificate = build_certificate([])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The client certificate does not define a subject common "
"name. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_multiple_common_names(self):
"""
Test that the right client identity is returned when the client
certificate has multiple subject common names.
"""
client_certificate = build_certificate([
u'Test Identity 1',
u'Test Identity 2'
])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity 1', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity 1"
)
kmip_session._logger.warning.assert_called_once_with(
"Multiple client identities found. Using the first one processed."
)
def test_get_client_identity_with_incorrect_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
does not have client authentication set in its extended key usage
extension.
"""
client_certificate = build_certificate(
[u'Test Identity'],
bad_extension=True
)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable.",
kmip_session._get_client_identity
)
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop(self, request_mock):
"""
@ -141,6 +358,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._engine = mock.MagicMock()
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._engine.process_request = mock.MagicMock(
return_value=(message, kmip_session._max_response_size)
)
@ -168,6 +387,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -193,6 +414,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@ -218,6 +441,8 @@ class TestKmipSession(testtools.TestCase):
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session._engine = mock.MagicMock()
test_exception = Exception("Unexpected error.")
kmip_session._engine.process_request = mock.MagicMock(