Add server config option controlling certificate client auth

This change adds a server configuration option to control the
enforcement of TLS certificate client authentication. Before,
client TLS certificates had to include the extended key usage
extension with the clientAuth bit set to be used as sources of
client identity. The new configuration option,
enable_tls_client_auth, allows server admins to enable/disable
this requirement. The configuration setting is optional and the
server defaults to the original enforcing behavior if it is not
set. Admins must explicitly set the option to False to disable
enforcement.
This commit is contained in:
Peter Hamilton 2017-09-05 14:55:49 -04:00
parent 2915bf5f11
commit 4c6bbae452
6 changed files with 158 additions and 41 deletions

View File

@ -6,3 +6,4 @@ key_path=/etc/pykmip/certs/server_private_key.pem
ca_path=/etc/pykmip/certs/server_ca_cert.pem ca_path=/etc/pykmip/certs/server_ca_cert.pem
auth_suite=Basic auth_suite=Basic
policy_path=/etc/pykmip/policies policy_path=/etc/pykmip/policies
enable_tls_client_auth=True

View File

@ -34,6 +34,7 @@ class KmipServerConfig(object):
self._logger = logging.getLogger('kmip.server.config') self._logger = logging.getLogger('kmip.server.config')
self.settings = dict() self.settings = dict()
self.settings['enable_tls_client_auth'] = True
self._expected_settings = [ self._expected_settings = [
'hostname', 'hostname',
@ -44,7 +45,8 @@ class KmipServerConfig(object):
'auth_suite' 'auth_suite'
] ]
self._optional_settings = [ self._optional_settings = [
'policy_path' 'policy_path',
'enable_tls_client_auth'
] ]
def set_setting(self, setting, value): def set_setting(self, setting, value):
@ -80,8 +82,10 @@ class KmipServerConfig(object):
self._set_ca_path(value) self._set_ca_path(value)
elif setting == 'auth_suite': elif setting == 'auth_suite':
self._set_auth_suite(value) self._set_auth_suite(value)
else: elif setting == 'policy_path':
self._set_policy_path(value) self._set_policy_path(value)
else:
self._set_enable_tls_client_auth(value)
def load_settings(self, path): def load_settings(self, path):
""" """
@ -148,6 +152,10 @@ class KmipServerConfig(object):
self._set_auth_suite(parser.get('server', 'auth_suite')) self._set_auth_suite(parser.get('server', 'auth_suite'))
if parser.has_option('server', 'policy_path'): if parser.has_option('server', 'policy_path'):
self._set_policy_path(parser.get('server', 'policy_path')) self._set_policy_path(parser.get('server', 'policy_path'))
if parser.has_option('server', 'enable_tls_client_auth'):
self._set_enable_tls_client_auth(
parser.getboolean('server', 'enable_tls_client_auth')
)
def _set_hostname(self, value): def _set_hostname(self, value):
if isinstance(value, six.string_types): if isinstance(value, six.string_types):
@ -242,3 +250,14 @@ class KmipServerConfig(object):
"The policy path, if specified, must be a valid string path " "The policy path, if specified, must be a valid string path "
"to a filesystem directory." "to a filesystem directory."
) )
def _set_enable_tls_client_auth(self, value):
if value is None:
self.settings['enable_tls_client_auth'] = True
elif isinstance(value, bool):
self.settings['enable_tls_client_auth'] = value
else:
raise exceptions.ConfigurationError(
"The flag enabling the TLS certificate client auth flag check "
"must be a boolean."
)

View File

@ -50,7 +50,8 @@ class KmipServer(object):
auth_suite=None, auth_suite=None,
config_path='/etc/pykmip/server.conf', config_path='/etc/pykmip/server.conf',
log_path='/var/log/pykmip/server.log', log_path='/var/log/pykmip/server.log',
policy_path=None policy_path=None,
enable_tls_client_auth=None
): ):
""" """
Create a KmipServer. Create a KmipServer.
@ -95,6 +96,10 @@ class KmipServer(object):
policy_path (string): The path to the filesystem directory policy_path (string): The path to the filesystem directory
containing PyKMIP server operation policy JSON files. containing PyKMIP server operation policy JSON files.
Optional, defaults to None. Optional, defaults to None.
enable_tls_client_auth (boolean): A boolean indicating if the TLS
certificate client auth flag should be required for client
certificates when establishing a new client session. Optional,
defaults to None.
""" """
self._logger = logging.getLogger('kmip.server') self._logger = logging.getLogger('kmip.server')
self._setup_logging(log_path) self._setup_logging(log_path)
@ -108,7 +113,8 @@ class KmipServer(object):
key_path, key_path,
ca_path, ca_path,
auth_suite, auth_suite,
policy_path policy_path,
enable_tls_client_auth
) )
if self.config.settings.get('auth_suite') == 'TLS1.2': if self.config.settings.get('auth_suite') == 'TLS1.2':
@ -152,7 +158,8 @@ class KmipServer(object):
key_path=None, key_path=None,
ca_path=None, ca_path=None,
auth_suite=None, auth_suite=None,
policy_path=None policy_path=None,
enable_tls_client_auth=None
): ):
if path: if path:
self.config.load_settings(path) self.config.load_settings(path)
@ -171,6 +178,11 @@ class KmipServer(object):
self.config.set_setting('auth_suite', auth_suite) self.config.set_setting('auth_suite', auth_suite)
if policy_path: if policy_path:
self.config.set_setting('policy_path', policy_path) self.config.set_setting('policy_path', policy_path)
if enable_tls_client_auth is not None:
self.config.set_setting(
'enable_tls_client_auth',
enable_tls_client_auth
)
def start(self): def start(self):
""" """
@ -343,7 +355,10 @@ class KmipServer(object):
s = session.KmipSession( s = session.KmipSession(
self._engine, self._engine,
connection, connection,
name=session_name name=session_name,
enable_tls_client_auth=self.config.settings.get(
'enable_tls_client_auth'
)
) )
s.daemon = True s.daemon = True
s.start() s.start()
@ -478,6 +493,18 @@ def build_argument_parser():
"directory. Optional, defaults to None." "directory. Optional, defaults to None."
), ),
) )
parser.add_option(
"-i",
"--ignore_tls_client_auth",
action="store_true",
default=False,
dest="ignore_tls_client_auth",
help=(
"A boolean indicating whether or not the TLS certificate client "
"auth flag should be ignored when establishing client sessions. "
"Optional, defaults to None."
)
)
return parser return parser
@ -506,6 +533,8 @@ def main(args=None):
kwargs['log_path'] = opts.log_path kwargs['log_path'] = opts.log_path
if opts.policy_path: if opts.policy_path:
kwargs['policy_path'] = opts.policy_path kwargs['policy_path'] = opts.policy_path
if opts.ignore_tls_client_auth:
kwargs['enable_tls_client_auth'] = False
# Create and start the server. # Create and start the server.
s = KmipServer(**kwargs) s = KmipServer(**kwargs)

View File

@ -33,7 +33,11 @@ class KmipSession(threading.Thread):
A session thread representing a single KMIP client/server interaction. A session thread representing a single KMIP client/server interaction.
""" """
def __init__(self, engine, connection, name=None): def __init__(self,
engine,
connection,
name=None,
enable_tls_client_auth=True):
""" """
Create a KmipSession. Create a KmipSession.
@ -44,6 +48,10 @@ class KmipSession(threading.Thread):
representing a new KMIP connection. Required. representing a new KMIP connection. Required.
name (str): The name of the KmipSession. Optional, defaults to name (str): The name of the KmipSession. Optional, defaults to
None. None.
enable_tls_client_auth (bool): A flag that enables a strict check
for the client auth flag in the extended key usage extension
in client certificates when establishing the client/server TLS
connection. Optional, defaults to True.
""" """
super(KmipSession, self).__init__( super(KmipSession, self).__init__(
group=None, group=None,
@ -60,6 +68,8 @@ class KmipSession(threading.Thread):
self._engine = engine self._engine = engine
self._connection = connection self._connection = connection
self._enable_tls_client_auth = enable_tls_client_auth
self._max_buffer_size = 4096 self._max_buffer_size = 4096
self._max_request_size = 1048576 self._max_request_size = 1048576
self._max_response_size = 1048576 self._max_response_size = 1048576
@ -89,7 +99,7 @@ class KmipSession(threading.Thread):
def _get_client_identity(self): def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True) certificate_data = self._connection.getpeercert(binary_form=True)
try: try:
certificate = x509.load_der_x509_certificate( cert = x509.load_der_x509_certificate(
certificate_data, certificate_data,
backends.default_backend() backends.default_backend()
) )
@ -102,42 +112,44 @@ class KmipSession(threading.Thread):
"connection. Could not retrieve client identity." "connection. Could not retrieve client identity."
) )
try: if self._enable_tls_client_auth:
extended_key_usage = certificate.extensions.get_extension_for_oid( try:
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE extended_key_usage = cert.extensions.get_extension_for_oid(
).value x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
except x509.ExtensionNotFound: ).value
raise exceptions.PermissionDenied( except x509.ExtensionNotFound:
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
client_identities = certificate.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied( raise exceptions.PermissionDenied(
"The client certificate does not define a subject common " "The extended key usage extension is missing from the "
"name. Session client identity unavailable." "client certificate. Session client identity unavailable."
) )
raise exceptions.PermissionDenied( if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in \
"The extended key usage extension is not marked for client " extended_key_usage:
"authentication in the client certificate. Session client " raise exceptions.PermissionDenied(
"identity unavailable." "The extended key usage extension is not marked for "
"client authentication in the client certificate. Session "
"client identity unavailable."
)
client_identities = cert.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
) )
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
def _handle_message_loop(self): def _handle_message_loop(self):
request_data = self._receive_request() request_data = self._receive_request()

View File

@ -54,6 +54,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_key_path = mock.MagicMock() c._set_key_path = mock.MagicMock()
c._set_port = mock.MagicMock() c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock() c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
# Test the right error is generated when setting an unsupported # Test the right error is generated when setting an unsupported
# setting. # setting.
@ -92,6 +93,9 @@ class TestKmipServerConfig(testtools.TestCase):
c.set_setting('policy_path', '/etc/pykmip/policies') c.set_setting('policy_path', '/etc/pykmip/policies')
c._set_policy_path.assert_called_once_with('/etc/pykmip/policies') c._set_policy_path.assert_called_once_with('/etc/pykmip/policies')
c.set_setting('enable_tls_client_auth', False)
c._set_enable_tls_client_auth.assert_called_once_with(False)
def test_load_settings(self): def test_load_settings(self):
""" """
Test that the right calls are made and the right errors generated when Test that the right calls are made and the right errors generated when
@ -144,6 +148,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_key_path = mock.MagicMock() c._set_key_path = mock.MagicMock()
c._set_port = mock.MagicMock() c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock() c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
# Test that the right calls are made when correctly parsing settings. # Test that the right calls are made when correctly parsing settings.
parser = configparser.SafeConfigParser() parser = configparser.SafeConfigParser()
@ -155,6 +160,7 @@ class TestKmipServerConfig(testtools.TestCase):
parser.set('server', 'ca_path', '/test/path/ca.crt') parser.set('server', 'ca_path', '/test/path/ca.crt')
parser.set('server', 'auth_suite', 'Basic') parser.set('server', 'auth_suite', 'Basic')
parser.set('server', 'policy_path', '/test/path/policies') parser.set('server', 'policy_path', '/test/path/policies')
parser.set('server', 'enable_tls_client_auth', 'False')
c._parse_settings(parser) c._parse_settings(parser)
@ -167,6 +173,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_ca_path.assert_called_once_with('/test/path/ca.crt') c._set_ca_path.assert_called_once_with('/test/path/ca.crt')
c._set_auth_suite.assert_called_once_with('Basic') c._set_auth_suite.assert_called_once_with('Basic')
c._set_policy_path.assert_called_once_with('/test/path/policies') c._set_policy_path.assert_called_once_with('/test/path/policies')
c._set_enable_tls_client_auth.assert_called_once_with(False)
# Test that a ConfigurationError is generated when the expected # Test that a ConfigurationError is generated when the expected
# section is missing. # section is missing.
@ -520,3 +527,47 @@ class TestKmipServerConfig(testtools.TestCase):
*args *args
) )
self.assertNotEqual(1, c.settings.get('policy_path')) self.assertNotEqual(1, c.settings.get('policy_path'))
def test_set_enable_tls_client_auth(self):
"""
Test that the enable_tls_client_auth configuration property can be set
correctly.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
self.assertIn('enable_tls_client_auth', c.settings.keys())
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
# Test that the setting is set correctly with a valid value
c._set_enable_tls_client_auth(False)
self.assertEqual(
False,
c.settings.get('enable_tls_client_auth')
)
c._set_enable_tls_client_auth(None)
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
c._set_enable_tls_client_auth(True)
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
# Test that a ConfigurationError is generated when setting the wrong
# value.
args = ('invalid',)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The flag enabling the TLS certificate client auth flag check "
"must be a boolean.",
c._set_enable_tls_client_auth,
*args
)

View File

@ -119,7 +119,8 @@ class TestKmipServer(testtools.TestCase):
'/etc/pykmip/certs/server.key', '/etc/pykmip/certs/server.key',
'/etc/pykmip/certs/ca.crt', '/etc/pykmip/certs/ca.crt',
'Basic', 'Basic',
'/etc/pykmip/policies' '/etc/pykmip/policies',
False
) )
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
@ -142,6 +143,10 @@ class TestKmipServer(testtools.TestCase):
'policy_path', 'policy_path',
'/etc/pykmip/policies' '/etc/pykmip/policies'
) )
s.config.set_setting.assert_any_call(
'enable_tls_client_auth',
False
)
# Test that an attempt is made to instantiate the TLS 1.2 auth suite # Test that an attempt is made to instantiate the TLS 1.2 auth suite
s = server.KmipServer( s = server.KmipServer(